Java 类org.apache.hadoop.mapred.MROutputFiles 实例源码

项目:aliyun-oss-hadoop-fs    文件:TestMergeManager.java   
@Test
public void testLargeMemoryLimits() throws Exception {
  final JobConf conf = new JobConf();
  // Xmx in production
  conf.setLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
      8L * 1024 * 1024 * 1024);

  // M1 = Xmx fraction for map outputs
  conf.setFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 1.0f);

  // M2 = max M1 fraction for a single maple output
  conf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.95f);

  // M3 = M1 fraction at which in memory merge is triggered
  conf.setFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 1.0f);

  // M4 = M1 fraction of map outputs remaining in memory for a reduce
  conf.setFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 1.0f);

  final MergeManagerImpl<Text, Text> mgr = new MergeManagerImpl<Text, Text>(
      null, conf, mock(LocalFileSystem.class), null, null, null, null, null,
      null, null, null, null, null, new MROutputFiles());
  assertTrue("Large shuffle area unusable: " + mgr.memoryLimit,
      mgr.memoryLimit > Integer.MAX_VALUE);
  final long maxInMemReduce = mgr.getMaxInMemReduceLimit();
  assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce,
      maxInMemReduce > Integer.MAX_VALUE);
}
项目:hops    文件:TestMergeManager.java   
@Test
public void testLargeMemoryLimits() throws Exception {
  final JobConf conf = new JobConf();
  // Xmx in production
  conf.setLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
      8L * 1024 * 1024 * 1024);

  // M1 = Xmx fraction for map outputs
  conf.setFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 1.0f);

  // M2 = max M1 fraction for a single maple output
  conf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.95f);

  // M3 = M1 fraction at which in memory merge is triggered
  conf.setFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 1.0f);

  // M4 = M1 fraction of map outputs remaining in memory for a reduce
  conf.setFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 1.0f);

  final MergeManagerImpl<Text, Text> mgr = new MergeManagerImpl<Text, Text>(
      null, conf, mock(LocalFileSystem.class), null, null, null, null, null,
      null, null, null, null, null, new MROutputFiles());
  assertTrue("Large shuffle area unusable: " + mgr.memoryLimit,
      mgr.memoryLimit > Integer.MAX_VALUE);
  final long maxInMemReduce = mgr.getMaxInMemReduceLimit();
  assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce,
      maxInMemReduce > Integer.MAX_VALUE);
  assertEquals("maxSingleShuffleLimit to be capped at Integer.MAX_VALUE",
      Integer.MAX_VALUE, mgr.maxSingleShuffleLimit);
  verifyReservedMapOutputType(mgr, 10L, "MEMORY");
  verifyReservedMapOutputType(mgr, 1L + Integer.MAX_VALUE, "DISK");
}
项目:hadoop    文件:TestMergeManager.java   
@SuppressWarnings({ "unchecked", "deprecation" })
@Test(timeout=10000)
public void testOnDiskMerger() throws IOException, URISyntaxException,
  InterruptedException {
  JobConf jobConf = new JobConf();
  final int SORT_FACTOR = 5;
  jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);

  MapOutputFile mapOutputFile = new MROutputFiles();
  FileSystem fs = FileSystem.getLocal(jobConf);
  MergeManagerImpl<IntWritable, IntWritable> manager =
    new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null
      , null, null, null, null, null, null, null, null, null, mapOutputFile);

  MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>
    onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>,
      IntWritable, IntWritable>) Whitebox.getInternalState(manager,
        "onDiskMerger");
  int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
    "mergeFactor");

  // make sure the io.sort.factor is set properly
  assertEquals(mergeFactor, SORT_FACTOR);

  // Stop the onDiskMerger thread so that we can intercept the list of files
  // waiting to be merged.
  onDiskMerger.suspend();

  //Send the list of fake files waiting to be merged
  Random rand = new Random();
  for(int i = 0; i < 2*SORT_FACTOR; ++i) {
    Path path = new Path("somePath");
    CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt());
    manager.closeOnDiskFile(cap);
  }

  //Check that the files pending to be merged are in sorted order.
  LinkedList<List<CompressAwarePath>> pendingToBeMerged =
    (LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState(
      onDiskMerger, "pendingToBeMerged");
  assertTrue("No inputs were added to list pending to merge",
    pendingToBeMerged.size() > 0);
  for(int i = 0; i < pendingToBeMerged.size(); ++i) {
    List<CompressAwarePath> inputs = pendingToBeMerged.get(i);
    for(int j = 1; j < inputs.size(); ++j) {
      assertTrue("Not enough / too many inputs were going to be merged",
        inputs.size() > 0 && inputs.size() <= SORT_FACTOR);
      assertTrue("Inputs to be merged were not sorted according to size: ",
        inputs.get(j).getCompressedSize()
        >= inputs.get(j-1).getCompressedSize());
    }
  }

}
项目:aliyun-oss-hadoop-fs    文件:TestMergeManager.java   
@SuppressWarnings({ "unchecked", "deprecation" })
@Test(timeout=10000)
public void testOnDiskMerger() throws IOException, URISyntaxException,
  InterruptedException {
  JobConf jobConf = new JobConf();
  final int SORT_FACTOR = 5;
  jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);

  MapOutputFile mapOutputFile = new MROutputFiles();
  FileSystem fs = FileSystem.getLocal(jobConf);
  MergeManagerImpl<IntWritable, IntWritable> manager =
    new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null
      , null, null, null, null, null, null, null, null, null, mapOutputFile);

  MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>
    onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>,
      IntWritable, IntWritable>) Whitebox.getInternalState(manager,
        "onDiskMerger");
  int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
    "mergeFactor");

  // make sure the io.sort.factor is set properly
  assertEquals(mergeFactor, SORT_FACTOR);

  // Stop the onDiskMerger thread so that we can intercept the list of files
  // waiting to be merged.
  onDiskMerger.suspend();

  //Send the list of fake files waiting to be merged
  Random rand = new Random();
  for(int i = 0; i < 2*SORT_FACTOR; ++i) {
    Path path = new Path("somePath");
    CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt());
    manager.closeOnDiskFile(cap);
  }

  //Check that the files pending to be merged are in sorted order.
  LinkedList<List<CompressAwarePath>> pendingToBeMerged =
    (LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState(
      onDiskMerger, "pendingToBeMerged");
  assertTrue("No inputs were added to list pending to merge",
    pendingToBeMerged.size() > 0);
  for(int i = 0; i < pendingToBeMerged.size(); ++i) {
    List<CompressAwarePath> inputs = pendingToBeMerged.get(i);
    for(int j = 1; j < inputs.size(); ++j) {
      assertTrue("Not enough / too many inputs were going to be merged",
        inputs.size() > 0 && inputs.size() <= SORT_FACTOR);
      assertTrue("Inputs to be merged were not sorted according to size: ",
        inputs.get(j).getCompressedSize()
        >= inputs.get(j-1).getCompressedSize());
    }
  }

}
项目:big-c    文件:TestMergeManager.java   
@SuppressWarnings({ "unchecked", "deprecation" })
@Test(timeout=10000)
public void testOnDiskMerger() throws IOException, URISyntaxException,
  InterruptedException {
  JobConf jobConf = new JobConf();
  final int SORT_FACTOR = 5;
  jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);

  MapOutputFile mapOutputFile = new MROutputFiles();
  FileSystem fs = FileSystem.getLocal(jobConf);
  MergeManagerImpl<IntWritable, IntWritable> manager =
    new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null
      , null, null, null, null, null, null, null, null, null, mapOutputFile);

  MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>
    onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>,
      IntWritable, IntWritable>) Whitebox.getInternalState(manager,
        "onDiskMerger");
  int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
    "mergeFactor");

  // make sure the io.sort.factor is set properly
  assertEquals(mergeFactor, SORT_FACTOR);

  // Stop the onDiskMerger thread so that we can intercept the list of files
  // waiting to be merged.
  onDiskMerger.suspend();

  //Send the list of fake files waiting to be merged
  Random rand = new Random();
  for(int i = 0; i < 2*SORT_FACTOR; ++i) {
    Path path = new Path("somePath");
    CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt());
    manager.closeOnDiskFile(cap);
  }

  //Check that the files pending to be merged are in sorted order.
  LinkedList<List<CompressAwarePath>> pendingToBeMerged =
    (LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState(
      onDiskMerger, "pendingToBeMerged");
  assertTrue("No inputs were added to list pending to merge",
    pendingToBeMerged.size() > 0);
  for(int i = 0; i < pendingToBeMerged.size(); ++i) {
    List<CompressAwarePath> inputs = pendingToBeMerged.get(i);
    for(int j = 1; j < inputs.size(); ++j) {
      assertTrue("Not enough / too many inputs were going to be merged",
        inputs.size() > 0 && inputs.size() <= SORT_FACTOR);
      assertTrue("Inputs to be merged were not sorted according to size: ",
        inputs.get(j).getCompressedSize()
        >= inputs.get(j-1).getCompressedSize());
    }
  }

}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestMergeManager.java   
@SuppressWarnings({ "unchecked", "deprecation" })
@Test(timeout=10000)
public void testOnDiskMerger() throws IOException, URISyntaxException,
  InterruptedException {
  JobConf jobConf = new JobConf();
  final int SORT_FACTOR = 5;
  jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);

  MapOutputFile mapOutputFile = new MROutputFiles();
  FileSystem fs = FileSystem.getLocal(jobConf);
  MergeManagerImpl<IntWritable, IntWritable> manager =
    new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null
      , null, null, null, null, null, null, null, null, null, mapOutputFile);

  MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>
    onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>,
      IntWritable, IntWritable>) Whitebox.getInternalState(manager,
        "onDiskMerger");
  int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
    "mergeFactor");

  // make sure the io.sort.factor is set properly
  assertEquals(mergeFactor, SORT_FACTOR);

  // Stop the onDiskMerger thread so that we can intercept the list of files
  // waiting to be merged.
  onDiskMerger.suspend();

  //Send the list of fake files waiting to be merged
  Random rand = new Random();
  for(int i = 0; i < 2*SORT_FACTOR; ++i) {
    Path path = new Path("somePath");
    CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt());
    manager.closeOnDiskFile(cap);
  }

  //Check that the files pending to be merged are in sorted order.
  LinkedList<List<CompressAwarePath>> pendingToBeMerged =
    (LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState(
      onDiskMerger, "pendingToBeMerged");
  assertTrue("No inputs were added to list pending to merge",
    pendingToBeMerged.size() > 0);
  for(int i = 0; i < pendingToBeMerged.size(); ++i) {
    List<CompressAwarePath> inputs = pendingToBeMerged.get(i);
    for(int j = 1; j < inputs.size(); ++j) {
      assertTrue("Not enough / too many inputs were going to be merged",
        inputs.size() > 0 && inputs.size() <= SORT_FACTOR);
      assertTrue("Inputs to be merged were not sorted according to size: ",
        inputs.get(j).getCompressedSize()
        >= inputs.get(j-1).getCompressedSize());
    }
  }

}
项目:hadoop-plus    文件:TestMerger.java   
@Test
public void testInMemoryMerger() throws IOException {
  JobID jobId = new JobID("a", 0);
  TaskAttemptID reduceId = new TaskAttemptID(
      new TaskID(jobId, TaskType.REDUCE, 0), 0);
  TaskAttemptID mapId1 = new TaskAttemptID(
      new TaskID(jobId, TaskType.MAP, 1), 0);
  TaskAttemptID mapId2 = new TaskAttemptID(
      new TaskID(jobId, TaskType.MAP, 2), 0);

  LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);

  MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>(
      reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
      null, null, new Progress(), new MROutputFiles());

  // write map outputs
  Map<String, String> map1 = new TreeMap<String, String>();
  map1.put("apple", "disgusting");
  map1.put("carrot", "delicious");
  Map<String, String> map2 = new TreeMap<String, String>();
  map1.put("banana", "pretty good");
  byte[] mapOutputBytes1 = writeMapOutput(conf, map1);
  byte[] mapOutputBytes2 = writeMapOutput(conf, map2);
  InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, Text>(
      conf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
  InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, Text>(
      conf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
  System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
      mapOutputBytes1.length);
  System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
      mapOutputBytes2.length);

  // create merger and run merge
  MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger =
      mergeManager.createInMemoryMerger();
  List<InMemoryMapOutput<Text, Text>> mapOutputs =
      new ArrayList<InMemoryMapOutput<Text, Text>>();
  mapOutputs.add(mapOutput1);
  mapOutputs.add(mapOutput2);

  inMemoryMerger.merge(mapOutputs);

  Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
  Path outPath = mergeManager.onDiskMapOutputs.iterator().next();

  List<String> keys = new ArrayList<String>();
  List<String> values = new ArrayList<String>();
  readOnDiskMapOutput(conf, fs, outPath, keys, values);
  Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
  Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious"));
}
项目:hadoop-plus    文件:TestMergeManager.java   
@SuppressWarnings({ "unchecked", "deprecation" })
@Test(timeout=10000)
public void testOnDiskMerger() throws IOException, URISyntaxException,
  InterruptedException {
  JobConf jobConf = new JobConf();
  final int SORT_FACTOR = 5;
  jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);

  MapOutputFile mapOutputFile = new MROutputFiles();
  FileSystem fs = FileSystem.getLocal(jobConf);
  MergeManagerImpl<IntWritable, IntWritable> manager =
    new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null
      , null, null, null, null, null, null, null, null, null, mapOutputFile);

  MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>
    onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>,
      IntWritable, IntWritable>) Whitebox.getInternalState(manager,
        "onDiskMerger");
  int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
    "mergeFactor");

  // make sure the io.sort.factor is set properly
  assertEquals(mergeFactor, SORT_FACTOR);

  // Stop the onDiskMerger thread so that we can intercept the list of files
  // waiting to be merged.
  onDiskMerger.suspend();

  //Send the list of fake files waiting to be merged
  Random rand = new Random();
  for(int i = 0; i < 2*SORT_FACTOR; ++i) {
    Path path = new Path("somePath");
    CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt());
    manager.closeOnDiskFile(cap);
  }

  //Check that the files pending to be merged are in sorted order.
  LinkedList<List<CompressAwarePath>> pendingToBeMerged =
    (LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState(
      onDiskMerger, "pendingToBeMerged");
  assertTrue("No inputs were added to list pending to merge",
    pendingToBeMerged.size() > 0);
  for(int i = 0; i < pendingToBeMerged.size(); ++i) {
    List<CompressAwarePath> inputs = pendingToBeMerged.get(i);
    for(int j = 1; j < inputs.size(); ++j) {
      assertTrue("Not enough / too many inputs were going to be merged",
        inputs.size() > 0 && inputs.size() <= SORT_FACTOR);
      assertTrue("Inputs to be merged were not sorted according to size: ",
        inputs.get(j).getCompressedSize()
        >= inputs.get(j-1).getCompressedSize());
    }
  }

}
项目:FlexMap    文件:TestMergeManager.java   
@SuppressWarnings({ "unchecked", "deprecation" })
@Test(timeout=10000)
public void testOnDiskMerger() throws IOException, URISyntaxException,
  InterruptedException {
  JobConf jobConf = new JobConf();
  final int SORT_FACTOR = 5;
  jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);

  MapOutputFile mapOutputFile = new MROutputFiles();
  FileSystem fs = FileSystem.getLocal(jobConf);
  MergeManagerImpl<IntWritable, IntWritable> manager =
    new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null
      , null, null, null, null, null, null, null, null, null, mapOutputFile);

  MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>
    onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>,
      IntWritable, IntWritable>) Whitebox.getInternalState(manager,
        "onDiskMerger");
  int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
    "mergeFactor");

  // make sure the io.sort.factor is set properly
  assertEquals(mergeFactor, SORT_FACTOR);

  // Stop the onDiskMerger thread so that we can intercept the list of files
  // waiting to be merged.
  onDiskMerger.suspend();

  //Send the list of fake files waiting to be merged
  Random rand = new Random();
  for(int i = 0; i < 2*SORT_FACTOR; ++i) {
    Path path = new Path("somePath");
    CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt());
    manager.closeOnDiskFile(cap);
  }

  //Check that the files pending to be merged are in sorted order.
  LinkedList<List<CompressAwarePath>> pendingToBeMerged =
    (LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState(
      onDiskMerger, "pendingToBeMerged");
  assertTrue("No inputs were added to list pending to merge",
    pendingToBeMerged.size() > 0);
  for(int i = 0; i < pendingToBeMerged.size(); ++i) {
    List<CompressAwarePath> inputs = pendingToBeMerged.get(i);
    for(int j = 1; j < inputs.size(); ++j) {
      assertTrue("Not enough / too many inputs were going to be merged",
        inputs.size() > 0 && inputs.size() <= SORT_FACTOR);
      assertTrue("Inputs to be merged were not sorted according to size: ",
        inputs.get(j).getCompressedSize()
        >= inputs.get(j-1).getCompressedSize());
    }
  }

}
项目:hops    文件:TestMergeManager.java   
@SuppressWarnings({ "unchecked", "deprecation" })
@Test(timeout=10000)
public void testOnDiskMerger() throws IOException, URISyntaxException,
  InterruptedException {
  JobConf jobConf = new JobConf();
  final int SORT_FACTOR = 5;
  jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);

  MapOutputFile mapOutputFile = new MROutputFiles();
  FileSystem fs = FileSystem.getLocal(jobConf);
  MergeManagerImpl<IntWritable, IntWritable> manager =
    new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null
      , null, null, null, null, null, null, null, null, null, mapOutputFile);

  MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>
    onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>,
      IntWritable, IntWritable>) Whitebox.getInternalState(manager,
        "onDiskMerger");
  int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
    "mergeFactor");

  // make sure the io.sort.factor is set properly
  assertEquals(mergeFactor, SORT_FACTOR);

  // Stop the onDiskMerger thread so that we can intercept the list of files
  // waiting to be merged.
  onDiskMerger.suspend();

  //Send the list of fake files waiting to be merged
  Random rand = new Random();
  for(int i = 0; i < 2*SORT_FACTOR; ++i) {
    Path path = new Path("somePath");
    CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt());
    manager.closeOnDiskFile(cap);
  }

  //Check that the files pending to be merged are in sorted order.
  LinkedList<List<CompressAwarePath>> pendingToBeMerged =
    (LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState(
      onDiskMerger, "pendingToBeMerged");
  assertTrue("No inputs were added to list pending to merge",
    pendingToBeMerged.size() > 0);
  for(int i = 0; i < pendingToBeMerged.size(); ++i) {
    List<CompressAwarePath> inputs = pendingToBeMerged.get(i);
    for(int j = 1; j < inputs.size(); ++j) {
      assertTrue("Not enough / too many inputs were going to be merged",
        inputs.size() > 0 && inputs.size() <= SORT_FACTOR);
      assertTrue("Inputs to be merged were not sorted according to size: ",
        inputs.get(j).getCompressedSize()
        >= inputs.get(j-1).getCompressedSize());
    }
  }

}
项目:hadoop-TCP    文件:TestMerger.java   
@Test
public void testInMemoryMerger() throws Throwable {
  JobID jobId = new JobID("a", 0);
  TaskAttemptID reduceId = new TaskAttemptID(
      new TaskID(jobId, TaskType.REDUCE, 0), 0);
  TaskAttemptID mapId1 = new TaskAttemptID(
      new TaskID(jobId, TaskType.MAP, 1), 0);
  TaskAttemptID mapId2 = new TaskAttemptID(
      new TaskID(jobId, TaskType.MAP, 2), 0);

  LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);

  MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>(
      reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
      null, null, new Progress(), new MROutputFiles());

  // write map outputs
  Map<String, String> map1 = new TreeMap<String, String>();
  map1.put("apple", "disgusting");
  map1.put("carrot", "delicious");
  Map<String, String> map2 = new TreeMap<String, String>();
  map1.put("banana", "pretty good");
  byte[] mapOutputBytes1 = writeMapOutput(conf, map1);
  byte[] mapOutputBytes2 = writeMapOutput(conf, map2);
  InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, Text>(
      conf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
  InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, Text>(
      conf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
  System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
      mapOutputBytes1.length);
  System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
      mapOutputBytes2.length);

  // create merger and run merge
  MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger =
      mergeManager.createInMemoryMerger();
  List<InMemoryMapOutput<Text, Text>> mapOutputs =
      new ArrayList<InMemoryMapOutput<Text, Text>>();
  mapOutputs.add(mapOutput1);
  mapOutputs.add(mapOutput2);

  inMemoryMerger.merge(mapOutputs);

  Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
  Path outPath = mergeManager.onDiskMapOutputs.iterator().next();

  List<String> keys = new ArrayList<String>();
  List<String> values = new ArrayList<String>();
  readOnDiskMapOutput(conf, fs, outPath, keys, values);
  Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
  Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious"));

  mergeManager.close();
  Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size());
  Assert.assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size());
  Assert.assertEquals(0, mergeManager.onDiskMapOutputs.size());
}
项目:hadoop-TCP    文件:TestMergeManager.java   
@SuppressWarnings({ "unchecked", "deprecation" })
@Test(timeout=10000)
public void testOnDiskMerger() throws IOException, URISyntaxException,
  InterruptedException {
  JobConf jobConf = new JobConf();
  final int SORT_FACTOR = 5;
  jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);

  MapOutputFile mapOutputFile = new MROutputFiles();
  FileSystem fs = FileSystem.getLocal(jobConf);
  MergeManagerImpl<IntWritable, IntWritable> manager =
    new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null
      , null, null, null, null, null, null, null, null, null, mapOutputFile);

  MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>
    onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>,
      IntWritable, IntWritable>) Whitebox.getInternalState(manager,
        "onDiskMerger");
  int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
    "mergeFactor");

  // make sure the io.sort.factor is set properly
  assertEquals(mergeFactor, SORT_FACTOR);

  // Stop the onDiskMerger thread so that we can intercept the list of files
  // waiting to be merged.
  onDiskMerger.suspend();

  //Send the list of fake files waiting to be merged
  Random rand = new Random();
  for(int i = 0; i < 2*SORT_FACTOR; ++i) {
    Path path = new Path("somePath");
    CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt());
    manager.closeOnDiskFile(cap);
  }

  //Check that the files pending to be merged are in sorted order.
  LinkedList<List<CompressAwarePath>> pendingToBeMerged =
    (LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState(
      onDiskMerger, "pendingToBeMerged");
  assertTrue("No inputs were added to list pending to merge",
    pendingToBeMerged.size() > 0);
  for(int i = 0; i < pendingToBeMerged.size(); ++i) {
    List<CompressAwarePath> inputs = pendingToBeMerged.get(i);
    for(int j = 1; j < inputs.size(); ++j) {
      assertTrue("Not enough / too many inputs were going to be merged",
        inputs.size() > 0 && inputs.size() <= SORT_FACTOR);
      assertTrue("Inputs to be merged were not sorted according to size: ",
        inputs.get(j).getCompressedSize()
        >= inputs.get(j-1).getCompressedSize());
    }
  }

}
项目:hardfs    文件:TestMerger.java   
@Test
public void testInMemoryMerger() throws Throwable {
  JobID jobId = new JobID("a", 0);
  TaskAttemptID reduceId = new TaskAttemptID(
      new TaskID(jobId, TaskType.REDUCE, 0), 0);
  TaskAttemptID mapId1 = new TaskAttemptID(
      new TaskID(jobId, TaskType.MAP, 1), 0);
  TaskAttemptID mapId2 = new TaskAttemptID(
      new TaskID(jobId, TaskType.MAP, 2), 0);

  LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);

  MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>(
      reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
      null, null, new Progress(), new MROutputFiles());

  // write map outputs
  Map<String, String> map1 = new TreeMap<String, String>();
  map1.put("apple", "disgusting");
  map1.put("carrot", "delicious");
  Map<String, String> map2 = new TreeMap<String, String>();
  map1.put("banana", "pretty good");
  byte[] mapOutputBytes1 = writeMapOutput(conf, map1);
  byte[] mapOutputBytes2 = writeMapOutput(conf, map2);
  InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, Text>(
      conf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
  InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, Text>(
      conf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
  System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
      mapOutputBytes1.length);
  System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
      mapOutputBytes2.length);

  // create merger and run merge
  MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger =
      mergeManager.createInMemoryMerger();
  List<InMemoryMapOutput<Text, Text>> mapOutputs =
      new ArrayList<InMemoryMapOutput<Text, Text>>();
  mapOutputs.add(mapOutput1);
  mapOutputs.add(mapOutput2);

  inMemoryMerger.merge(mapOutputs);

  Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
  Path outPath = mergeManager.onDiskMapOutputs.iterator().next();

  List<String> keys = new ArrayList<String>();
  List<String> values = new ArrayList<String>();
  readOnDiskMapOutput(conf, fs, outPath, keys, values);
  Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
  Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious"));

  mergeManager.close();
  Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size());
  Assert.assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size());
  Assert.assertEquals(0, mergeManager.onDiskMapOutputs.size());
}
项目:hardfs    文件:TestMergeManager.java   
@SuppressWarnings({ "unchecked", "deprecation" })
@Test(timeout=10000)
public void testOnDiskMerger() throws IOException, URISyntaxException,
  InterruptedException {
  JobConf jobConf = new JobConf();
  final int SORT_FACTOR = 5;
  jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);

  MapOutputFile mapOutputFile = new MROutputFiles();
  FileSystem fs = FileSystem.getLocal(jobConf);
  MergeManagerImpl<IntWritable, IntWritable> manager =
    new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null
      , null, null, null, null, null, null, null, null, null, mapOutputFile);

  MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>
    onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>,
      IntWritable, IntWritable>) Whitebox.getInternalState(manager,
        "onDiskMerger");
  int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
    "mergeFactor");

  // make sure the io.sort.factor is set properly
  assertEquals(mergeFactor, SORT_FACTOR);

  // Stop the onDiskMerger thread so that we can intercept the list of files
  // waiting to be merged.
  onDiskMerger.suspend();

  //Send the list of fake files waiting to be merged
  Random rand = new Random();
  for(int i = 0; i < 2*SORT_FACTOR; ++i) {
    Path path = new Path("somePath");
    CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt());
    manager.closeOnDiskFile(cap);
  }

  //Check that the files pending to be merged are in sorted order.
  LinkedList<List<CompressAwarePath>> pendingToBeMerged =
    (LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState(
      onDiskMerger, "pendingToBeMerged");
  assertTrue("No inputs were added to list pending to merge",
    pendingToBeMerged.size() > 0);
  for(int i = 0; i < pendingToBeMerged.size(); ++i) {
    List<CompressAwarePath> inputs = pendingToBeMerged.get(i);
    for(int j = 1; j < inputs.size(); ++j) {
      assertTrue("Not enough / too many inputs were going to be merged",
        inputs.size() > 0 && inputs.size() <= SORT_FACTOR);
      assertTrue("Inputs to be merged were not sorted according to size: ",
        inputs.get(j).getCompressedSize()
        >= inputs.get(j-1).getCompressedSize());
    }
  }

}
项目:hadoop-on-lustre2    文件:TestMerger.java   
@Test
public void testInMemoryMerger() throws Throwable {
  JobID jobId = new JobID("a", 0);
  TaskAttemptID reduceId = new TaskAttemptID(
      new TaskID(jobId, TaskType.REDUCE, 0), 0);
  TaskAttemptID mapId1 = new TaskAttemptID(
      new TaskID(jobId, TaskType.MAP, 1), 0);
  TaskAttemptID mapId2 = new TaskAttemptID(
      new TaskID(jobId, TaskType.MAP, 2), 0);

  LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);

  MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>(
      reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
      null, null, new Progress(), new MROutputFiles());

  // write map outputs
  Map<String, String> map1 = new TreeMap<String, String>();
  map1.put("apple", "disgusting");
  map1.put("carrot", "delicious");
  Map<String, String> map2 = new TreeMap<String, String>();
  map1.put("banana", "pretty good");
  byte[] mapOutputBytes1 = writeMapOutput(conf, map1);
  byte[] mapOutputBytes2 = writeMapOutput(conf, map2);
  InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, Text>(
      conf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
  InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, Text>(
      conf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
  System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
      mapOutputBytes1.length);
  System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
      mapOutputBytes2.length);

  // create merger and run merge
  MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger =
      mergeManager.createInMemoryMerger();
  List<InMemoryMapOutput<Text, Text>> mapOutputs =
      new ArrayList<InMemoryMapOutput<Text, Text>>();
  mapOutputs.add(mapOutput1);
  mapOutputs.add(mapOutput2);

  inMemoryMerger.merge(mapOutputs);

  Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
  Path outPath = mergeManager.onDiskMapOutputs.iterator().next();

  List<String> keys = new ArrayList<String>();
  List<String> values = new ArrayList<String>();
  readOnDiskMapOutput(conf, fs, outPath, keys, values);
  Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
  Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious"));

  mergeManager.close();
  Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size());
  Assert.assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size());
  Assert.assertEquals(0, mergeManager.onDiskMapOutputs.size());
}
项目:hadoop-on-lustre2    文件:TestMergeManager.java   
@SuppressWarnings({ "unchecked", "deprecation" })
@Test(timeout=10000)
public void testOnDiskMerger() throws IOException, URISyntaxException,
  InterruptedException {
  JobConf jobConf = new JobConf();
  final int SORT_FACTOR = 5;
  jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);

  MapOutputFile mapOutputFile = new MROutputFiles();
  FileSystem fs = FileSystem.getLocal(jobConf);
  MergeManagerImpl<IntWritable, IntWritable> manager =
    new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null
      , null, null, null, null, null, null, null, null, null, mapOutputFile);

  MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>
    onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>,
      IntWritable, IntWritable>) Whitebox.getInternalState(manager,
        "onDiskMerger");
  int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
    "mergeFactor");

  // make sure the io.sort.factor is set properly
  assertEquals(mergeFactor, SORT_FACTOR);

  // Stop the onDiskMerger thread so that we can intercept the list of files
  // waiting to be merged.
  onDiskMerger.suspend();

  //Send the list of fake files waiting to be merged
  Random rand = new Random();
  for(int i = 0; i < 2*SORT_FACTOR; ++i) {
    Path path = new Path("somePath");
    CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt());
    manager.closeOnDiskFile(cap);
  }

  //Check that the files pending to be merged are in sorted order.
  LinkedList<List<CompressAwarePath>> pendingToBeMerged =
    (LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState(
      onDiskMerger, "pendingToBeMerged");
  assertTrue("No inputs were added to list pending to merge",
    pendingToBeMerged.size() > 0);
  for(int i = 0; i < pendingToBeMerged.size(); ++i) {
    List<CompressAwarePath> inputs = pendingToBeMerged.get(i);
    for(int j = 1; j < inputs.size(); ++j) {
      assertTrue("Not enough / too many inputs were going to be merged",
        inputs.size() > 0 && inputs.size() <= SORT_FACTOR);
      assertTrue("Inputs to be merged were not sorted according to size: ",
        inputs.get(j).getCompressedSize()
        >= inputs.get(j-1).getCompressedSize());
    }
  }

}