Java 类org.apache.hadoop.mapred.MapOutputCollector 实例源码

项目:mr-collector-benchmark    文件:MapOutputCollectorBenchmark.java   
/**
 * Collect a bunch of random terasort-like records into the map output
 * collector.
 */
@SuppressWarnings("unchecked")
private void collectRecords(MapOutputCollector collector)
    throws IOException, InterruptedException {
  // Use a consistent random seed so that different implementations are
  // sorting the exact same data.
  XorshiftRandom r = new XorshiftRandom(1);

  // The 10-90 key-value split is the same as terasort.
  BytesWritable key = new BytesWritable();
  key.setSize(10);
  BytesWritable val = new BytesWritable();
  val.setSize(90);
  r.nextBytes(val.getBytes());

  byte[] keyBytes = key.getBytes();

  for (int i = 0; i < NUM_RECORDS; i++) {
    int partition = i % NUM_PARTITIONS;
    r.nextBytes(keyBytes);
    collector.collect(key, val, partition);
  }
}
项目:mr-collector-benchmark    文件:MapOutputCollectorBenchmark.java   
public void runTest(Class<? extends MapOutputCollector> collector) throws Exception {
  for (int i = 0; i < 30; i++) {
    // GC a few times first so we're really just testing the collection, nothing else.
    System.gc();
    System.gc();
    System.gc();
    doBenchmark(collector);
  }
}
项目:mr-collector-benchmark    文件:MapOutputCollectorBenchmark.java   
public static void main(String[] args) throws Exception {
  Preconditions.checkArgument(args.length == 1,
      "Usage: " + MapOutputCollectorBenchmark.class.getName() +
      " <collector class name>");
  Class<?> clazz = Class.forName(args[0]);
  Class<? extends MapOutputCollector> collectorClazz =
      clazz.asSubclass(MapOutputCollector.class);
  new MapOutputCollectorBenchmark().runTest(collectorClazz);
}
项目:mr-collector-benchmark    文件:MapOutputCollectorBenchmark.java   
private void doBenchmark(Class<? extends MapOutputCollector> collectorClazz)
  throws Exception {
  JobConf jobConf = new JobConf();
  jobConf.setInt("io.sort.mb", 600);
  jobConf.setInt("io.file.buffer.size", 128*1024);
  jobConf.setMapOutputKeyClass(BytesWritable.class);
  jobConf.setMapOutputValueClass(BytesWritable.class);
  jobConf.setNumReduceTasks(NUM_PARTITIONS);
  jobConf.set(JobContext.TASK_ATTEMPT_ID, "test_attempt");

  // Fake out a bunch of stuff to make a task context.
  MapOutputFile output = new YarnOutputFiles();
  output.setConf(jobConf);

  Progress mapProgress = new Progress();
  mapProgress.addPhase("map");
  mapProgress.addPhase("sort");

  MapTask mapTask = Mockito.mock(MapTask.class);
  Mockito.doReturn(output).when(mapTask).getMapOutputFile();
  Mockito.doReturn(true).when(mapTask).isMapTask();
  Mockito.doReturn(new TaskAttemptID("fake-jt", 12345, TaskType.MAP, 1, 1)).when(mapTask).getTaskID();
  Mockito.doReturn(mapProgress).when(mapTask).getSortPhase();

  MapTask t = new MapTask();
  Constructor<TaskReporter> constructor =
      TaskReporter.class.getDeclaredConstructor(Task.class,
          Progress.class, TaskUmbilicalProtocol.class);
  constructor.setAccessible(true);
  TaskReporter reporter = constructor.newInstance(t, mapProgress, null);
  reporter.setProgress(0.0f);
  Context context = new MapOutputCollector.Context(mapTask, jobConf, reporter);

  // Actually run the map sort.
  ResourceTimer timer = new ResourceTimer();
  MapOutputCollector<?,?> collector = ReflectionUtils.newInstance(collectorClazz, jobConf);
  collector.init(context);
  collectRecords(collector);
  collector.flush();
  collector.close();

  // Print results
  System.out.println("---------------------");
  System.out.println("Results for " + collectorClazz.getName() + ":");
  System.out.println("CPU time: " + timer.elapsedCpu() + "ms");
  System.out.println("CPU time (only this thread): " + timer.elapsedCpuThisThread() + "ms");
  System.out.println("Compilation time: " + timer.elapsedCompilation());
  System.out.println("GC time: " + timer.elapsedGC());
  System.out.println("Wall time: " + timer.elapsedWall());
  System.out.println("---------------------");
}