Java 类org.apache.hadoop.util.ResourceCalculatorPlugin.ProcResourceValues 实例源码

项目:hadoop-2.6.0-cdh5.4.3    文件:Task.java   
/**
* Update resource information counters
*/
void updateResourceCounters() {
  // Update generic resource counters
  updateHeapUsageCounter();

  if (resourceCalculator == null) {
    return;
  }
  ProcResourceValues res = resourceCalculator.getProcResourceValues();
  long cpuTime = res.getCumulativeCpuTime();
  long pMem = res.getPhysicalMemorySize();
  long vMem = res.getVirtualMemorySize();
  // Remove the CPU time consumed previously by JVM reuse
  cpuTime -= initCpuCumulativeTime;
  counters.findCounter(Counter.CPU_MILLISECONDS).setValue(cpuTime);
  counters.findCounter(Counter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
  counters.findCounter(Counter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
}
项目:hadoop-EAR    文件:BlockMapOutputBuffer.java   
protected ProcResourceValues sortReduceParts() {
  long sortStartMilli = System.currentTimeMillis();
  ProcResourceValues sortStartProcVals =
      task.getCurrentProcResourceValues();
  long sortStart = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK");
  // sort
  for (int i = 0; i < reducePartitions.length; i++) {
    reducePartitions[i].groupOrSort();
  }
  long sortEndMilli = System.currentTimeMillis();
  ProcResourceValues sortEndProcVals =
      task.getCurrentProcResourceValues();
  long sortEnd = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK");
  mapSpillSortCounter.incCountersPerSort(sortStartProcVals,
      sortEndProcVals, sortEndMilli - sortStartMilli);
  mapSpillSortCounter.incJVMCPUPerSort(sortStart, sortEnd);
  return sortEndProcVals;
}
项目:hadoop-EAR    文件:BlockMapOutputBuffer.java   
public synchronized void flush() throws IOException, ClassNotFoundException,
    InterruptedException {
  if (numSpills > 0 && lastSpillInMem) {
    // if there is already one spills, we can try to hold this last spill in
    // memory.
    sortReduceParts();
    for (int i = 0; i < partitions; i++) {
      this.inMemorySegments[i] =
          new Segment<K, V>(this.reducePartitions[i].getIReader(),
              true);
    }
    hasInMemorySpill=true;
  } else {
    sortAndSpill();      
  }
  long mergeStartMilli = System.currentTimeMillis();
  ProcResourceValues mergeStartProcVals = task.getCurrentProcResourceValues();
  long mergeStart = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK");
  mergeParts();
  long mergeEndMilli = System.currentTimeMillis();
  ProcResourceValues mergeEndProcVals = task.getCurrentProcResourceValues();
  long mergeEnd = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK");
  mapSpillSortCounter.incMergeCounters(mergeStartProcVals, mergeEndProcVals,
      mergeEndMilli - mergeStartMilli);
  mapSpillSortCounter.incJVMCPUMerge(mergeStart, mergeEnd);
}
项目:hadoop-on-lustre    文件:Task.java   
/**
* Update resource information counters
*/
void updateResourceCounters() {
  // Update generic resource counters
  updateHeapUsageCounter();

  if (resourceCalculator == null) {
    return;
  }
  ProcResourceValues res = resourceCalculator.getProcResourceValues();
  long cpuTime = res.getCumulativeCpuTime();
  long pMem = res.getPhysicalMemorySize();
  long vMem = res.getVirtualMemorySize();
  // Remove the CPU time consumed previously by JVM reuse
  cpuTime -= initCpuCumulativeTime;
  counters.findCounter(Counter.CPU_MILLISECONDS).setValue(cpuTime);
  counters.findCounter(Counter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
  counters.findCounter(Counter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
}
项目:RDFS    文件:BlockMapOutputBuffer.java   
public synchronized void flush() throws IOException, ClassNotFoundException,
    InterruptedException {
  if (numSpills > 0 && lastSpillInMem) {
    // if there is already one spills, we can try to hold this last spill in
    // memory.
    sortReduceParts();
    for (int i = 0; i < partitions; i++) {
      this.inMemorySegments[i] =
          new Segment<K, V>(this.reducePartitions[i].getIReader(),
              true);
    }
    hasInMemorySpill=true;
  } else {
    sortAndSpill();      
  }
  long mergeStartMilli = System.currentTimeMillis();
  ProcResourceValues mergeStartProcVals = task.getCurrentProcResourceValues();
  mergeParts();
  long mergeEndMilli = System.currentTimeMillis();
  ProcResourceValues mergeEndProcVals = task.getCurrentProcResourceValues();
  mapSpillSortCounter.incMergeCounters(mergeStartProcVals, mergeEndProcVals,
      mergeEndMilli - mergeStartMilli);
}
项目:hanoi-hadoop-2.0.0-cdh    文件:Task.java   
/**
* Update resource information counters
*/
void updateResourceCounters() {
  // Update generic resource counters
  updateHeapUsageCounter();

  if (resourceCalculator == null) {
    return;
  }
  ProcResourceValues res = resourceCalculator.getProcResourceValues();
  long cpuTime = res.getCumulativeCpuTime();
  long pMem = res.getPhysicalMemorySize();
  long vMem = res.getVirtualMemorySize();
  // Remove the CPU time consumed previously by JVM reuse
  cpuTime -= initCpuCumulativeTime;
  counters.findCounter(Counter.CPU_MILLISECONDS).setValue(cpuTime);
  counters.findCounter(Counter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
  counters.findCounter(Counter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
}
项目:hortonworks-extension    文件:Task.java   
/**
* Update resource information counters
*/
void updateResourceCounters() {
  // Update generic resource counters
  updateHeapUsageCounter();

  if (resourceCalculator == null) {
    return;
  }
  ProcResourceValues res = resourceCalculator.getProcResourceValues();
  long cpuTime = res.getCumulativeCpuTime();
  long pMem = res.getPhysicalMemorySize();
  long vMem = res.getVirtualMemorySize();
  // Remove the CPU time consumed previously by JVM reuse
  cpuTime -= initCpuCumulativeTime;
  counters.findCounter(Counter.CPU_MILLISECONDS).setValue(cpuTime);
  counters.findCounter(Counter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
  counters.findCounter(Counter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
}
项目:hortonworks-extension    文件:Task.java   
/**
* Update resource information counters
*/
void updateResourceCounters() {
  // Update generic resource counters
  updateHeapUsageCounter();

  if (resourceCalculator == null) {
    return;
  }
  ProcResourceValues res = resourceCalculator.getProcResourceValues();
  long cpuTime = res.getCumulativeCpuTime();
  long pMem = res.getPhysicalMemorySize();
  long vMem = res.getVirtualMemorySize();
  // Remove the CPU time consumed previously by JVM reuse
  cpuTime -= initCpuCumulativeTime;
  counters.findCounter(Counter.CPU_MILLISECONDS).setValue(cpuTime);
  counters.findCounter(Counter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
  counters.findCounter(Counter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
}
项目:hadoop-EAR    文件:MapSpillSortCounters.java   
public void incCountersPerSpill(ProcResourceValues spillStartProcVals,
    ProcResourceValues spillEndProcVals, long wallClockVal,
    long spillBytesVal) {
  numSpillsVal += 1;
  long cpuUsedBySpill = getCPUVal(spillStartProcVals, spillEndProcVals);
  mapSpillCPUVal += cpuUsedBySpill;
  mapSpillWallClockVal += wallClockVal;
  mapSpillBytesVal += spillBytesVal;
}
项目:hadoop-EAR    文件:MapSpillSortCounters.java   
public void incMergeCounters(ProcResourceValues mergeStartProcVals,
    ProcResourceValues mergeEndProcVals, long wallClockVal) {
  long cpuUsedByMerge = this
      .getCPUVal(mergeStartProcVals, mergeEndProcVals);
  mapMergeCPUVal += cpuUsedByMerge;
  this.mapMergeWallClockVal += wallClockVal;
}
项目:hadoop-EAR    文件:MapSpillSortCounters.java   
private long getCPUVal(ProcResourceValues startProcVals,
    ProcResourceValues endProcVals) {
  long cpuUsed = 0;
  if (startProcVals != null &&  endProcVals != null) {
    long cpuStartVal = startProcVals.getCumulativeCpuTime();
    long cpuEndVal = endProcVals.getCumulativeCpuTime();
    if (cpuEndVal > cpuStartVal) {
      cpuUsed = cpuEndVal - cpuStartVal;
    }
  }
  return cpuUsed;
}
项目:hadoop-EAR    文件:MapTask.java   
public void incCountersPerSpill(ProcResourceValues spillStartProcVals,
    ProcResourceValues spillEndProcVals, long wallClockVal,
    long spillBytesVal) {
  numSpillsVal += 1;
  long cpuUsedBySpill = getCPUVal(spillStartProcVals, spillEndProcVals);
  mapSpillCPUVal += cpuUsedBySpill;
  mapSpillWallClockVal += wallClockVal;
  mapSpillBytesVal += spillBytesVal;
}
项目:hadoop-EAR    文件:MapTask.java   
public void incMergeCounters(ProcResourceValues mergeStartProcVals,
    ProcResourceValues mergeEndProcVals, long wallClockVal) {
  long cpuUsedByMerge = this
      .getCPUVal(mergeStartProcVals, mergeEndProcVals);
  mapMergeCPUVal += cpuUsedByMerge;
  this.mapMergeWallClockVal += wallClockVal;
}
项目:hadoop-EAR    文件:MapTask.java   
private long getCPUVal(ProcResourceValues startProcVals,
    ProcResourceValues endProcVals) {
  long cpuUsed = 0;
  if (startProcVals != null &&  endProcVals != null) {
    long cpuStartVal = startProcVals.getCumulativeCpuTime();
    long cpuEndVal = endProcVals.getCumulativeCpuTime();
    if (cpuEndVal > cpuStartVal) {
      cpuUsed = cpuEndVal - cpuStartVal;
    }
  }
  return cpuUsed;
}
项目:hadoop-EAR    文件:ReduceTask.java   
private void setCPUCounter(ProcResourceValues startProcVals,
    ProcResourceValues endProcVals,
    org.apache.hadoop.mapred.Counters.Counter counter) {
  long cpuUsed = 0;
  if (startProcVals != null &&  endProcVals != null) {
    long cpuStartVal = startProcVals.getCumulativeCpuTime();
    long cpuEndVal = endProcVals.getCumulativeCpuTime();
    if (cpuEndVal > cpuStartVal) {
      cpuUsed = cpuEndVal - cpuStartVal;
    }
  }
  counter.setValue(cpuUsed);
}
项目:RDFS    文件:MapSpillSortCounters.java   
public void incCountersPerSpill(ProcResourceValues spillStartProcVals,
    ProcResourceValues spillEndProcVals, long wallClockVal,
    long spillBytesVal) {
  numSpillsVal += 1;
  long cpuUsedBySpill = getCPUVal(spillStartProcVals, spillEndProcVals);
  mapSpillCPUVal += cpuUsedBySpill;
  mapSpillWallClockVal += wallClockVal;
  mapSpillBytesVal += spillBytesVal;
}
项目:RDFS    文件:MapSpillSortCounters.java   
public void incMergeCounters(ProcResourceValues mergeStartProcVals,
    ProcResourceValues mergeEndProcVals, long wallClockVal) {
  long cpuUsedByMerge = this
      .getCPUVal(mergeStartProcVals, mergeEndProcVals);
  mapMergeCPUVal += cpuUsedByMerge;
  this.mapMergeWallClockVal += wallClockVal;
}
项目:RDFS    文件:MapSpillSortCounters.java   
private long getCPUVal(ProcResourceValues startProcVals,
    ProcResourceValues endProcVals) {
  long cpuUsed = 0;
  if (startProcVals != null &&  endProcVals != null) {
    long cpuStartVal = startProcVals.getCumulativeCpuTime();
    long cpuEndVal = endProcVals.getCumulativeCpuTime();
    if (cpuEndVal > cpuStartVal) {
      cpuUsed = cpuEndVal - cpuStartVal;
    }
  }
  return cpuUsed;
}
项目:RDFS    文件:MapTask.java   
public void incCountersPerSpill(ProcResourceValues spillStartProcVals,
    ProcResourceValues spillEndProcVals, long wallClockVal,
    long spillBytesVal) {
  numSpillsVal += 1;
  long cpuUsedBySpill = getCPUVal(spillStartProcVals, spillEndProcVals);
  mapSpillCPUVal += cpuUsedBySpill;
  mapSpillWallClockVal += wallClockVal;
  mapSpillBytesVal += spillBytesVal;
}
项目:RDFS    文件:MapTask.java   
public void incMergeCounters(ProcResourceValues mergeStartProcVals,
    ProcResourceValues mergeEndProcVals, long wallClockVal) {
  long cpuUsedByMerge = this
      .getCPUVal(mergeStartProcVals, mergeEndProcVals);
  mapMergeCPUVal += cpuUsedByMerge;
  this.mapMergeWallClockVal += wallClockVal;
}
项目:RDFS    文件:MapTask.java   
private long getCPUVal(ProcResourceValues startProcVals,
    ProcResourceValues endProcVals) {
  long cpuUsed = 0;
  if (startProcVals != null &&  endProcVals != null) {
    long cpuStartVal = startProcVals.getCumulativeCpuTime();
    long cpuEndVal = endProcVals.getCumulativeCpuTime();
    if (cpuEndVal > cpuStartVal) {
      cpuUsed = cpuEndVal - cpuStartVal;
    }
  }
  return cpuUsed;
}
项目:RDFS    文件:ReduceTask.java   
private void setCPUCounter(ProcResourceValues startProcVals,
    ProcResourceValues endProcVals,
    org.apache.hadoop.mapred.Counters.Counter counter) {
  long cpuUsed = 0;
  if (startProcVals != null &&  endProcVals != null) {
    long cpuStartVal = startProcVals.getCumulativeCpuTime();
    long cpuEndVal = endProcVals.getCumulativeCpuTime();
    if (cpuEndVal > cpuStartVal) {
      cpuUsed = cpuEndVal - cpuStartVal;
    }
  }
  counter.setValue(cpuUsed);
}
项目:RDFS    文件:BlockMapOutputBuffer.java   
protected ProcResourceValues sortReduceParts() {
  long sortStartMilli = System.currentTimeMillis();
  ProcResourceValues sortStartProcVals =
      task.getCurrentProcResourceValues();
  // sort
  for (int i = 0; i < reducePartitions.length; i++) {
    reducePartitions[i].groupOrSort();
  }
  long sortEndMilli = System.currentTimeMillis();
  ProcResourceValues sortEndProcVals =
      task.getCurrentProcResourceValues();
  mapSpillSortCounter.incCountersPerSort(sortStartProcVals,
      sortEndProcVals, sortEndMilli - sortStartMilli);
  return sortEndProcVals;
}
项目:hadoop-EAR    文件:MapSpillSortCounters.java   
public void incCountersPerSort(ProcResourceValues sortStartProcVals,
    ProcResourceValues sortEndProcVals, long wallClockVal) {
  long cpuUsedBySort = getCPUVal(sortStartProcVals, sortEndProcVals);
  mapMemSortCPUVal += cpuUsedBySort;
  mapMemSortWallClockVal += wallClockVal;
}
项目:hadoop-EAR    文件:MapTask.java   
public void incCountersPerSort(ProcResourceValues sortStartProcVals,
    ProcResourceValues sortEndProcVals, long wallClockVal) {
  long cpuUsedBySort = getCPUVal(sortStartProcVals, sortEndProcVals);
  mapMemSortCPUVal += cpuUsedBySort;
  mapMemSortWallClockVal += wallClockVal;
}
项目:hadoop-EAR    文件:MapTask.java   
/**
 * Handles the degenerate case where serialization fails to fit in
 * the in-memory buffer, so we must spill the record from collect
 * directly to a spill file. Consider this "losing".
 */
private void spillSingleRecord(final K key, final V value,
                               int partition) throws IOException {
  long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
  FSDataOutputStream out = null;
  try {

    long spillStartMilli = System.currentTimeMillis();
    ProcResourceValues spillStartProcVals = getCurrentProcResourceValues();
    long spillStart = jmxThreadInfoTracker.getCurrentThreadCPUTime();
    long spillBytes = 0;

    // create spill file
    final SpillRecord spillRec = new SpillRecord(partitions);
    final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
        numSpills, size);
    out = rfs.create(filename);

    // we don't run the combiner for a single record
    IndexRecord rec = new IndexRecord();
    for (int i = 0; i < partitions; ++i) {
      IFile.Writer<K, V> writer = null;
      try {
        long segmentStart = out.getPos();
        // Create a new codec, don't care!
        writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
                                        spilledRecordsCounter);

        if (i == partition) {
          final long recordStart = out.getPos();
          writer.append(key, value);
          // Note that our map byte count will not be accurate with
          // compression
          mapOutputByteCounter.increment(out.getPos() - recordStart);
        }
        writer.close();

        // record offsets
        rec.startOffset = segmentStart;
        rec.rawLength = writer.getRawLength();
        rec.partLength = writer.getCompressedLength();
        spillBytes += writer.getCompressedLength();
        spillRec.putIndex(rec, i);

        writer = null;
      } catch (IOException e) {
        if (null != writer) writer.close();
        throw e;
      }
    }
    if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
      // create spill index file
      Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
          getTaskID(), numSpills,
          partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
      spillRec.writeToFile(indexFilename, job);
    } else {
      indexCacheList.add(spillRec);
      totalIndexCacheMemory +=
        spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
    }

    long spillEndMilli = System.currentTimeMillis();
    ProcResourceValues spillEndProcVals = getCurrentProcResourceValues();
    long spillEnd = jmxThreadInfoTracker.getCurrentThreadCPUTime();
    spillSortCounters.incCountersPerSpill(spillStartProcVals,
        spillEndProcVals, spillEndMilli - spillStartMilli, spillBytes);
    spillSortCounters.incJVMCPUPerSpill(spillStart, spillEnd);
    ++numSpills;
  } finally {
    if (out != null) out.close();
  }
}
项目:hadoop-EAR    文件:BlockMapOutputBuffer.java   
@Override
public void sortAndSpill() throws IOException {
  ProcResourceValues sortEndProcVals = sortReduceParts();
  long sortEndMilli = System.currentTimeMillis();
  long spillStart = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK");
  // spill
  FSDataOutputStream out = null;
  long spillBytes = 0;
  try {
    // create spill file
    final SpillRecord spillRec = new SpillRecord(partitions);
    final Path filename =
        task.mapOutputFile
            .getSpillFileForWrite(getTaskID(), numSpills,
                this.memoryBlockAllocator.getEstimatedSize());
    out = rfs.create(filename);
    for (int i = 0; i < partitions; ++i) {
      IndexRecord rec =
          reducePartitions[i].spill(job, out, keyClass, valClass,
              codec, task.spilledRecordsCounter);
      // record offsets
      spillBytes += rec.partLength;
      spillRec.putIndex(rec, i);
    }

    if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
      // create spill index file
      Path indexFilename =
          task.mapOutputFile.getSpillIndexFileForWrite(getTaskID(),
              numSpills, partitions
                  * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
      spillRec.writeToFile(indexFilename, job);
    } else {
      indexCacheList.add(spillRec);
      totalIndexCacheMemory +=
          spillRec.size() * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
    }
    LOG.info("Finished spill " + numSpills);
    ++numSpills;
  } finally {
    if (out != null)
      out.close();
  }

  long spillEndMilli = System.currentTimeMillis();
  ProcResourceValues spillEndProcVals =
      task.getCurrentProcResourceValues();
  long spillEnd = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK");
  mapSpillSortCounter.incCountersPerSpill(sortEndProcVals,
      spillEndProcVals, spillEndMilli - sortEndMilli, spillBytes);
  mapSpillSortCounter.incJVMCPUPerSpill(spillStart, spillEnd);
}
项目:hadoop-EAR    文件:BlockMapOutputBuffer.java   
public void spillSingleRecord(K key, V value, int part)
    throws IOException {

  ProcResourceValues spillStartProcVals =
      task.getCurrentProcResourceValues();
  long spillStartMilli = System.currentTimeMillis();
  long spillStart = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK");
  // spill
  FSDataOutputStream out = null;
  long spillBytes = 0;
  try {
    // create spill file
    final SpillRecord spillRec = new SpillRecord(partitions);
    final Path filename =
        task.mapOutputFile.getSpillFileForWrite(getTaskID(),
            numSpills, key.getLength() + value.getLength());
    out = rfs.create(filename);
    IndexRecord rec = new IndexRecord();
    for (int i = 0; i < partitions; ++i) {
      IFile.Writer<K, V> writer = null;
      try {
        long segmentStart = out.getPos();
        // Create a new codec, don't care!
        writer =
            new IFile.Writer<K, V>(job, out, keyClass, valClass,
                codec, task.spilledRecordsCounter);
        if (i == part) {
          final long recordStart = out.getPos();
          writer.append(key, value);
          // Note that our map byte count will not be accurate with
          // compression
          mapOutputByteCounter
              .increment(out.getPos() - recordStart);
        }
        writer.close();

        // record offsets
        rec.startOffset = segmentStart;
        rec.rawLength = writer.getRawLength();
        rec.partLength = writer.getCompressedLength();
        spillBytes += writer.getCompressedLength();
        spillRec.putIndex(rec, i);
        writer = null;
      } catch (IOException e) {
        if (null != writer)
          writer.close();
        throw e;
      }
    }

    if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
      // create spill index file
      Path indexFilename =
          task.mapOutputFile.getSpillIndexFileForWrite(getTaskID(),
              numSpills, partitions
                  * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
      spillRec.writeToFile(indexFilename, job);
    } else {
      indexCacheList.add(spillRec);
      totalIndexCacheMemory +=
          spillRec.size() * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
    }

    LOG.info("Finished spill big record " + numBigRecordsSpills);
    ++numBigRecordsSpills;
    ++numSpills;
  } finally {
    if (out != null)
      out.close();
  }

  long spillEndMilli = System.currentTimeMillis();
  ProcResourceValues spillEndProcVals =
      task.getCurrentProcResourceValues();
  mapSpillSortCounter.incCountersPerSpill(spillStartProcVals,
      spillEndProcVals, spillEndMilli - spillStartMilli, spillBytes);
  long spillEnd = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK");
  mapSpillSortCounter.incJVMCPUPerSpill(spillStart, spillEnd);
  mapSpillSortCounter.incSpillSingleRecord();
}
项目:RDFS    文件:MapSpillSortCounters.java   
public void incCountersPerSort(ProcResourceValues sortStartProcVals,
    ProcResourceValues sortEndProcVals, long wallClockVal) {
  long cpuUsedBySort = getCPUVal(sortStartProcVals, sortEndProcVals);
  mapMemSortCPUVal += cpuUsedBySort;
  mapMemSortWallClockVal += wallClockVal;
}
项目:RDFS    文件:MapTask.java   
public void incCountersPerSort(ProcResourceValues sortStartProcVals,
    ProcResourceValues sortEndProcVals, long wallClockVal) {
  long cpuUsedBySort = getCPUVal(sortStartProcVals, sortEndProcVals);
  mapMemSortCPUVal += cpuUsedBySort;
  mapMemSortWallClockVal += wallClockVal;
}
项目:RDFS    文件:MapTask.java   
/**
 * Handles the degenerate case where serialization fails to fit in
 * the in-memory buffer, so we must spill the record from collect
 * directly to a spill file. Consider this "losing".
 */
private void spillSingleRecord(final K key, final V value,
                               int partition) throws IOException {
  long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
  FSDataOutputStream out = null;
  try {

    long spillStartMilli = System.currentTimeMillis();
    ProcResourceValues spillStartProcVals = getCurrentProcResourceValues();
    long spillBytes = 0;

    // create spill file
    final SpillRecord spillRec = new SpillRecord(partitions);
    final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
        numSpills, size);
    out = rfs.create(filename);

    // we don't run the combiner for a single record
    IndexRecord rec = new IndexRecord();
    for (int i = 0; i < partitions; ++i) {
      IFile.Writer<K, V> writer = null;
      try {
        long segmentStart = out.getPos();
        // Create a new codec, don't care!
        writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
                                        spilledRecordsCounter);

        if (i == partition) {
          final long recordStart = out.getPos();
          writer.append(key, value);
          // Note that our map byte count will not be accurate with
          // compression
          mapOutputByteCounter.increment(out.getPos() - recordStart);
        }
        writer.close();

        // record offsets
        rec.startOffset = segmentStart;
        rec.rawLength = writer.getRawLength();
        rec.partLength = writer.getCompressedLength();
        spillBytes += writer.getCompressedLength();
        spillRec.putIndex(rec, i);

        writer = null;
      } catch (IOException e) {
        if (null != writer) writer.close();
        throw e;
      }
    }
    if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
      // create spill index file
      Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
          getTaskID(), numSpills,
          partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
      spillRec.writeToFile(indexFilename, job);
    } else {
      indexCacheList.add(spillRec);
      totalIndexCacheMemory +=
        spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
    }

    long spillEndMilli = System.currentTimeMillis();
    ProcResourceValues spillEndProcVals = getCurrentProcResourceValues();        
    spillSortCounters.incCountersPerSpill(spillStartProcVals,
        spillEndProcVals, spillEndMilli - spillStartMilli, spillBytes);
    ++numSpills;
  } finally {
    if (out != null) out.close();
  }
}
项目:RDFS    文件:BlockMapOutputBuffer.java   
@Override
public void sortAndSpill() throws IOException {
  ProcResourceValues sortEndProcVals = sortReduceParts();
  long sortEndMilli = System.currentTimeMillis();
  // spill
  FSDataOutputStream out = null;
  long spillBytes = 0;
  try {
    // create spill file
    final SpillRecord spillRec = new SpillRecord(partitions);
    final Path filename =
        task.mapOutputFile
            .getSpillFileForWrite(getTaskID(), numSpills,
                this.memoryBlockAllocator.getEstimatedSize());
    out = rfs.create(filename);
    for (int i = 0; i < partitions; ++i) {
      IndexRecord rec =
          reducePartitions[i].spill(job, out, keyClass, valClass,
              codec, task.spilledRecordsCounter);
      // record offsets
      spillBytes += rec.partLength;
      spillRec.putIndex(rec, i);
    }

    if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
      // create spill index file
      Path indexFilename =
          task.mapOutputFile.getSpillIndexFileForWrite(getTaskID(),
              numSpills, partitions
                  * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
      spillRec.writeToFile(indexFilename, job);
    } else {
      indexCacheList.add(spillRec);
      totalIndexCacheMemory +=
          spillRec.size() * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
    }
    LOG.info("Finished spill " + numSpills);
    ++numSpills;
  } finally {
    if (out != null)
      out.close();
  }

  long spillEndMilli = System.currentTimeMillis();
  ProcResourceValues spillEndProcVals =
      task.getCurrentProcResourceValues();
  mapSpillSortCounter.incCountersPerSpill(sortEndProcVals,
      spillEndProcVals, spillEndMilli - sortEndMilli, spillBytes);
}
项目:RDFS    文件:BlockMapOutputBuffer.java   
public void spillSingleRecord(K key, V value, int part)
    throws IOException {

  ProcResourceValues spillStartProcVals =
      task.getCurrentProcResourceValues();
  long spillStartMilli = System.currentTimeMillis();
  // spill
  FSDataOutputStream out = null;
  long spillBytes = 0;
  try {
    // create spill file
    final SpillRecord spillRec = new SpillRecord(partitions);
    final Path filename =
        task.mapOutputFile.getSpillFileForWrite(getTaskID(),
            numSpills, key.getLength() + value.getLength());
    out = rfs.create(filename);
    IndexRecord rec = new IndexRecord();
    for (int i = 0; i < partitions; ++i) {
      IFile.Writer<K, V> writer = null;
      try {
        long segmentStart = out.getPos();
        // Create a new codec, don't care!
        writer =
            new IFile.Writer<K, V>(job, out, keyClass, valClass,
                codec, task.spilledRecordsCounter);
        if (i == part) {
          final long recordStart = out.getPos();
          writer.append(key, value);
          // Note that our map byte count will not be accurate with
          // compression
          mapOutputByteCounter
              .increment(out.getPos() - recordStart);
        }
        writer.close();

        // record offsets
        rec.startOffset = segmentStart;
        rec.rawLength = writer.getRawLength();
        rec.partLength = writer.getCompressedLength();
        spillBytes += writer.getCompressedLength();
        spillRec.putIndex(rec, i);
        writer = null;
      } catch (IOException e) {
        if (null != writer)
          writer.close();
        throw e;
      }
    }

    if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
      // create spill index file
      Path indexFilename =
          task.mapOutputFile.getSpillIndexFileForWrite(getTaskID(),
              numSpills, partitions
                  * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
      spillRec.writeToFile(indexFilename, job);
    } else {
      indexCacheList.add(spillRec);
      totalIndexCacheMemory +=
          spillRec.size() * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
    }

    LOG.info("Finished spill big record " + numBigRecordsSpills);
    ++numBigRecordsSpills;
    ++numSpills;
  } finally {
    if (out != null)
      out.close();
  }

  long spillEndMilli = System.currentTimeMillis();
  ProcResourceValues spillEndProcVals =
      task.getCurrentProcResourceValues();
  mapSpillSortCounter.incCountersPerSpill(spillStartProcVals,
      spillEndProcVals, spillEndMilli - spillStartMilli, spillBytes);
  mapSpillSortCounter.incSpillSingleRecord();
}