Java 类org.apache.hadoop.mapred.ShuffleConsumerPlugin 实例源码

项目:hadoop    文件:Shuffle.java   
@Override
public void init(ShuffleConsumerPlugin.Context context) {
  this.context = context;

  this.reduceId = context.getReduceId();
  this.jobConf = context.getJobConf();
  this.umbilical = context.getUmbilical();
  this.reporter = context.getReporter();
  this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
  this.copyPhase = context.getCopyPhase();
  this.taskStatus = context.getStatus();
  this.reduceTask = context.getReduceTask();
  this.localMapFiles = context.getLocalMapFiles();

  scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
      this, copyPhase, context.getShuffledMapsCounter(),
      context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
  merger = createMergeManager(context);
}
项目:hadoop    文件:TestShufflePlugin.java   
@Override
public void init(ShuffleConsumerPlugin.Context<K, V> context) {
    // just verify that Context has kept its public interface
  context.getReduceId();
  context.getJobConf();
  context.getLocalFS();
  context.getUmbilical();
  context.getLocalDirAllocator();
  context.getReporter();
  context.getCodec();
  context.getCombinerClass();
  context.getCombineCollector();
  context.getSpilledRecordsCounter();
  context.getReduceCombineInputCounter();
  context.getShuffledMapsCounter();
  context.getReduceShuffleBytes();
  context.getFailedShuffleCounter();
  context.getMergedMapOutputsCounter();
  context.getStatus();
  context.getCopyPhase();
  context.getMergePhase();
  context.getReduceTask();
  context.getMapOutputFile();
}
项目:aliyun-oss-hadoop-fs    文件:Shuffle.java   
@Override
public void init(ShuffleConsumerPlugin.Context context) {
  this.context = context;

  this.reduceId = context.getReduceId();
  this.jobConf = context.getJobConf();
  this.umbilical = context.getUmbilical();
  this.reporter = context.getReporter();
  this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
  this.copyPhase = context.getCopyPhase();
  this.taskStatus = context.getStatus();
  this.reduceTask = context.getReduceTask();
  this.localMapFiles = context.getLocalMapFiles();

  scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
      this, copyPhase, context.getShuffledMapsCounter(),
      context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
  merger = createMergeManager(context);
}
项目:aliyun-oss-hadoop-fs    文件:TestShufflePlugin.java   
@Override
public void init(ShuffleConsumerPlugin.Context<K, V> context) {
    // just verify that Context has kept its public interface
  context.getReduceId();
  context.getJobConf();
  context.getLocalFS();
  context.getUmbilical();
  context.getLocalDirAllocator();
  context.getReporter();
  context.getCodec();
  context.getCombinerClass();
  context.getCombineCollector();
  context.getSpilledRecordsCounter();
  context.getReduceCombineInputCounter();
  context.getShuffledMapsCounter();
  context.getReduceShuffleBytes();
  context.getFailedShuffleCounter();
  context.getMergedMapOutputsCounter();
  context.getStatus();
  context.getCopyPhase();
  context.getMergePhase();
  context.getReduceTask();
  context.getMapOutputFile();
}
项目:big-c    文件:Shuffle.java   
@Override
public void init(ShuffleConsumerPlugin.Context context) {
  this.context = context;

  this.reduceId = context.getReduceId();
  this.jobConf = context.getJobConf();
  this.umbilical = context.getUmbilical();
  this.reporter = context.getReporter();
  this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
  this.copyPhase = context.getCopyPhase();
  this.taskStatus = context.getStatus();
  this.reduceTask = context.getReduceTask();
  this.localMapFiles = context.getLocalMapFiles();

  scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
      this, copyPhase, context.getShuffledMapsCounter(),
      context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
  merger = createMergeManager(context);
}
项目:big-c    文件:TestShufflePlugin.java   
@Override
public void init(ShuffleConsumerPlugin.Context<K, V> context) {
    // just verify that Context has kept its public interface
  context.getReduceId();
  context.getJobConf();
  context.getLocalFS();
  context.getUmbilical();
  context.getLocalDirAllocator();
  context.getReporter();
  context.getCodec();
  context.getCombinerClass();
  context.getCombineCollector();
  context.getSpilledRecordsCounter();
  context.getReduceCombineInputCounter();
  context.getShuffledMapsCounter();
  context.getReduceShuffleBytes();
  context.getFailedShuffleCounter();
  context.getMergedMapOutputsCounter();
  context.getStatus();
  context.getCopyPhase();
  context.getMergePhase();
  context.getReduceTask();
  context.getMapOutputFile();
}
项目:hadoop-2.6.0-cdh5.4.3    文件:Shuffle.java   
@Override
public void init(ShuffleConsumerPlugin.Context context) {
  this.context = context;

  this.reduceId = context.getReduceId();
  this.jobConf = context.getJobConf();
  this.umbilical = context.getUmbilical();
  this.reporter = context.getReporter();
  this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
  this.copyPhase = context.getCopyPhase();
  this.taskStatus = context.getStatus();
  this.reduceTask = context.getReduceTask();
  this.localMapFiles = context.getLocalMapFiles();

  scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
      this, copyPhase, context.getShuffledMapsCounter(),
      context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
  merger = createMergeManager(context);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestShufflePlugin.java   
@Override
public void init(ShuffleConsumerPlugin.Context<K, V> context) {
    // just verify that Context has kept its public interface
  context.getReduceId();
  context.getJobConf();
  context.getLocalFS();
  context.getUmbilical();
  context.getLocalDirAllocator();
  context.getReporter();
  context.getCodec();
  context.getCombinerClass();
  context.getCombineCollector();
  context.getSpilledRecordsCounter();
  context.getReduceCombineInputCounter();
  context.getShuffledMapsCounter();
  context.getReduceShuffleBytes();
  context.getFailedShuffleCounter();
  context.getMergedMapOutputsCounter();
  context.getStatus();
  context.getCopyPhase();
  context.getMergePhase();
  context.getReduceTask();
  context.getMapOutputFile();
}
项目:hadoop-plus    文件:Shuffle.java   
@Override
public void init(ShuffleConsumerPlugin.Context context) {
  this.context = context;

  this.reduceId = context.getReduceId();
  this.jobConf = context.getJobConf();
  this.umbilical = context.getUmbilical();
  this.reporter = context.getReporter();
  this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
  this.copyPhase = context.getCopyPhase();
  this.taskStatus = context.getStatus();
  this.reduceTask = context.getReduceTask();

  scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
      this, copyPhase, context.getShuffledMapsCounter(),
      context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
  merger = createMergeManager(context);
}
项目:hadoop-plus    文件:TestShufflePlugin.java   
@Override
public void init(ShuffleConsumerPlugin.Context<K, V> context) {
    // just verify that Context has kept its public interface
  context.getReduceId();
  context.getJobConf();
  context.getLocalFS();
  context.getUmbilical();
  context.getLocalDirAllocator();
  context.getReporter();
  context.getCodec();
  context.getCombinerClass();
  context.getCombineCollector();
  context.getSpilledRecordsCounter();
  context.getReduceCombineInputCounter();
  context.getShuffledMapsCounter();
  context.getReduceShuffleBytes();
  context.getFailedShuffleCounter();
  context.getMergedMapOutputsCounter();
  context.getStatus();
  context.getCopyPhase();
  context.getMergePhase();
  context.getReduceTask();
  context.getMapOutputFile();
}
项目:FlexMap    文件:Shuffle.java   
@Override
public void init(ShuffleConsumerPlugin.Context context) {
  this.context = context;

  this.reduceId = context.getReduceId();
  this.jobConf = context.getJobConf();
  this.umbilical = context.getUmbilical();
  this.reporter = context.getReporter();
  this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
  this.copyPhase = context.getCopyPhase();
  this.taskStatus = context.getStatus();
  this.reduceTask = context.getReduceTask();
  this.localMapFiles = context.getLocalMapFiles();

  scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
      this, copyPhase, context.getShuffledMapsCounter(),
      context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
  merger = createMergeManager(context);
}
项目:FlexMap    文件:TestShufflePlugin.java   
@Override
public void init(ShuffleConsumerPlugin.Context<K, V> context) {
    // just verify that Context has kept its public interface
  context.getReduceId();
  context.getJobConf();
  context.getLocalFS();
  context.getUmbilical();
  context.getLocalDirAllocator();
  context.getReporter();
  context.getCodec();
  context.getCombinerClass();
  context.getCombineCollector();
  context.getSpilledRecordsCounter();
  context.getReduceCombineInputCounter();
  context.getShuffledMapsCounter();
  context.getReduceShuffleBytes();
  context.getFailedShuffleCounter();
  context.getMergedMapOutputsCounter();
  context.getStatus();
  context.getCopyPhase();
  context.getMergePhase();
  context.getReduceTask();
  context.getMapOutputFile();
}
项目:hops    文件:Shuffle.java   
@Override
public void init(ShuffleConsumerPlugin.Context context) {
  this.context = context;

  this.reduceId = context.getReduceId();
  this.jobConf = context.getJobConf();
  this.umbilical = context.getUmbilical();
  this.reporter = context.getReporter();
  this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
  this.copyPhase = context.getCopyPhase();
  this.taskStatus = context.getStatus();
  this.reduceTask = context.getReduceTask();
  this.localMapFiles = context.getLocalMapFiles();

  scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
      this, copyPhase, context.getShuffledMapsCounter(),
      context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
  merger = createMergeManager(context);
}
项目:hops    文件:TestShufflePlugin.java   
@Override
public void init(ShuffleConsumerPlugin.Context<K, V> context) {
    // just verify that Context has kept its public interface
  context.getReduceId();
  context.getJobConf();
  context.getLocalFS();
  context.getUmbilical();
  context.getLocalDirAllocator();
  context.getReporter();
  context.getCodec();
  context.getCombinerClass();
  context.getCombineCollector();
  context.getSpilledRecordsCounter();
  context.getReduceCombineInputCounter();
  context.getShuffledMapsCounter();
  context.getReduceShuffleBytes();
  context.getFailedShuffleCounter();
  context.getMergedMapOutputsCounter();
  context.getStatus();
  context.getCopyPhase();
  context.getMergePhase();
  context.getReduceTask();
  context.getMapOutputFile();
}
项目:hadoop-TCP    文件:Shuffle.java   
@Override
public void init(ShuffleConsumerPlugin.Context context) {
  this.context = context;

  this.reduceId = context.getReduceId();
  this.jobConf = context.getJobConf();
  this.umbilical = context.getUmbilical();
  this.reporter = context.getReporter();
  this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
  this.copyPhase = context.getCopyPhase();
  this.taskStatus = context.getStatus();
  this.reduceTask = context.getReduceTask();

  scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
      this, copyPhase, context.getShuffledMapsCounter(),
      context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
  merger = createMergeManager(context);
}
项目:hadoop-TCP    文件:TestShufflePlugin.java   
@Override
public void init(ShuffleConsumerPlugin.Context<K, V> context) {
    // just verify that Context has kept its public interface
  context.getReduceId();
  context.getJobConf();
  context.getLocalFS();
  context.getUmbilical();
  context.getLocalDirAllocator();
  context.getReporter();
  context.getCodec();
  context.getCombinerClass();
  context.getCombineCollector();
  context.getSpilledRecordsCounter();
  context.getReduceCombineInputCounter();
  context.getShuffledMapsCounter();
  context.getReduceShuffleBytes();
  context.getFailedShuffleCounter();
  context.getMergedMapOutputsCounter();
  context.getStatus();
  context.getCopyPhase();
  context.getMergePhase();
  context.getReduceTask();
  context.getMapOutputFile();
}
项目:hardfs    文件:Shuffle.java   
@Override
public void init(ShuffleConsumerPlugin.Context context) {
  this.context = context;

  this.reduceId = context.getReduceId();
  this.jobConf = context.getJobConf();
  this.umbilical = context.getUmbilical();
  this.reporter = context.getReporter();
  this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
  this.copyPhase = context.getCopyPhase();
  this.taskStatus = context.getStatus();
  this.reduceTask = context.getReduceTask();

  scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
      this, copyPhase, context.getShuffledMapsCounter(),
      context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
  merger = createMergeManager(context);
}
项目:hardfs    文件:TestShufflePlugin.java   
@Override
public void init(ShuffleConsumerPlugin.Context<K, V> context) {
    // just verify that Context has kept its public interface
  context.getReduceId();
  context.getJobConf();
  context.getLocalFS();
  context.getUmbilical();
  context.getLocalDirAllocator();
  context.getReporter();
  context.getCodec();
  context.getCombinerClass();
  context.getCombineCollector();
  context.getSpilledRecordsCounter();
  context.getReduceCombineInputCounter();
  context.getShuffledMapsCounter();
  context.getReduceShuffleBytes();
  context.getFailedShuffleCounter();
  context.getMergedMapOutputsCounter();
  context.getStatus();
  context.getCopyPhase();
  context.getMergePhase();
  context.getReduceTask();
  context.getMapOutputFile();
}
项目:hadoop-on-lustre2    文件:Shuffle.java   
@Override
public void init(ShuffleConsumerPlugin.Context context) {
  this.context = context;

  this.reduceId = context.getReduceId();
  this.jobConf = context.getJobConf();
  this.umbilical = context.getUmbilical();
  this.reporter = context.getReporter();
  this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
  this.copyPhase = context.getCopyPhase();
  this.taskStatus = context.getStatus();
  this.reduceTask = context.getReduceTask();
  this.localMapFiles = context.getLocalMapFiles();

  scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
      this, copyPhase, context.getShuffledMapsCounter(),
      context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
  merger = createMergeManager(context);
}
项目:hadoop-on-lustre2    文件:TestShufflePlugin.java   
@Override
public void init(ShuffleConsumerPlugin.Context<K, V> context) {
    // just verify that Context has kept its public interface
  context.getReduceId();
  context.getJobConf();
  context.getLocalFS();
  context.getUmbilical();
  context.getLocalDirAllocator();
  context.getReporter();
  context.getCodec();
  context.getCombinerClass();
  context.getCombineCollector();
  context.getSpilledRecordsCounter();
  context.getReduceCombineInputCounter();
  context.getShuffledMapsCounter();
  context.getReduceShuffleBytes();
  context.getFailedShuffleCounter();
  context.getMergedMapOutputsCounter();
  context.getStatus();
  context.getCopyPhase();
  context.getMergePhase();
  context.getReduceTask();
  context.getMapOutputFile();
}
项目:lustre-connector-for-hadoop    文件:LustreFsShuffle.java   
@Override
public void init(ShuffleConsumerPlugin.Context context) {

    this.reduceId = context.getReduceId();
    this.jobConf = context.getJobConf();
    this.umbilical = context.getUmbilical();
    this.reporter = context.getReporter();
    this.copyPhase = context.getCopyPhase();
    this.mergePhase = context.getMergePhase();
    this.taskStatus = context.getStatus();
    this.reduceTask = context.getReduceTask();
    this.codec = context.getCodec();
    this.spilledRecordsCounter = context.getSpilledRecordsCounter();
    this.mergedMapOutputsCounter = context.getMergedMapOutputsCounter();

    jobConf.setBoolean(MRConfig.MAPRED_IFILE_READAHEAD, false);
    try {
        lustrefs = (LustreFileSystem)FileSystem.get(LustreFileSystem.NAME, jobConf);
        mapOutputDir = SharedFsPlugins.getTempPath(jobConf,
                                                   JobID.downgrade(reduceId.getJobID()));
        reduceDir = new Path(mapOutputDir,
                             String.format(SharedFsPlugins.MAP_OUTPUT,
                                           reduceId.getTaskID().getId(), 0, 0)).getParent();
        mergeTempDir = new Path(mapOutputDir, "temp");
    } catch (IOException ioe) {
        throw new RuntimeException("Map Output directory not found !!", ioe);
    }

    // Scheduler
    scheduler = new ShuffleSchedulerImpl<K, V>(
                                               jobConf, taskStatus, reduceId, this, copyPhase,
                                               context.getShuffledMapsCounter(),
                                               context.getReduceShuffleBytes(),
                                               context.getFailedShuffleCounter());

    this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);

    this.merger = new FileMerger();
    this.merger.start();
}
项目:hadoop    文件:Shuffle.java   
protected MergeManager<K, V> createMergeManager(
    ShuffleConsumerPlugin.Context context) {
  return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(),
      context.getLocalDirAllocator(), reporter, context.getCodec(),
      context.getCombinerClass(), context.getCombineCollector(), 
      context.getSpilledRecordsCounter(),
      context.getReduceCombineInputCounter(),
      context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
      context.getMapOutputFile());
}
项目:hadoop    文件:TestShufflePlugin.java   
@Test
/**
 * A testing method instructing core hadoop to load an external ShuffleConsumerPlugin
 * as if it came from a 3rd party.
 */
public void testPluginAbility() {

  try{
    // create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin
    JobConf jobConf = new JobConf();
    jobConf.setClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN,
                     TestShufflePlugin.TestShuffleConsumerPlugin.class,
                     ShuffleConsumerPlugin.class);

    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
    Class<? extends ShuffleConsumerPlugin> clazz =
      jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
    assertNotNull("Unable to get " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, clazz);

    // load 3rd party plugin through core's factory method
    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf);
    assertNotNull("Unable to load " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, shuffleConsumerPlugin);
  }
  catch (Exception e) {
    assertTrue("Threw exception:" + e, false);
  }
}
项目:aliyun-oss-hadoop-fs    文件:Shuffle.java   
protected MergeManager<K, V> createMergeManager(
    ShuffleConsumerPlugin.Context context) {
  return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(),
      context.getLocalDirAllocator(), reporter, context.getCodec(),
      context.getCombinerClass(), context.getCombineCollector(), 
      context.getSpilledRecordsCounter(),
      context.getReduceCombineInputCounter(),
      context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
      context.getMapOutputFile());
}
项目:aliyun-oss-hadoop-fs    文件:TestShufflePlugin.java   
@Test
/**
 * A testing method instructing core hadoop to load an external ShuffleConsumerPlugin
 * as if it came from a 3rd party.
 */
public void testPluginAbility() {

  try{
    // create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin
    JobConf jobConf = new JobConf();
    jobConf.setClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN,
                     TestShufflePlugin.TestShuffleConsumerPlugin.class,
                     ShuffleConsumerPlugin.class);

    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
    Class<? extends ShuffleConsumerPlugin> clazz =
      jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
    assertNotNull("Unable to get " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, clazz);

    // load 3rd party plugin through core's factory method
    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf);
    assertNotNull("Unable to load " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, shuffleConsumerPlugin);
  }
  catch (Exception e) {
    assertTrue("Threw exception:" + e, false);
  }
}
项目:big-c    文件:Shuffle.java   
protected MergeManager<K, V> createMergeManager(
    ShuffleConsumerPlugin.Context context) {
  return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(),
      context.getLocalDirAllocator(), reporter, context.getCodec(),
      context.getCombinerClass(), context.getCombineCollector(), 
      context.getSpilledRecordsCounter(),
      context.getReduceCombineInputCounter(),
      context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
      context.getMapOutputFile());
}
项目:big-c    文件:TestShufflePlugin.java   
@Test
/**
 * A testing method instructing core hadoop to load an external ShuffleConsumerPlugin
 * as if it came from a 3rd party.
 */
public void testPluginAbility() {

  try{
    // create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin
    JobConf jobConf = new JobConf();
    jobConf.setClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN,
                     TestShufflePlugin.TestShuffleConsumerPlugin.class,
                     ShuffleConsumerPlugin.class);

    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
    Class<? extends ShuffleConsumerPlugin> clazz =
      jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
    assertNotNull("Unable to get " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, clazz);

    // load 3rd party plugin through core's factory method
    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf);
    assertNotNull("Unable to load " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, shuffleConsumerPlugin);
  }
  catch (Exception e) {
    assertTrue("Threw exception:" + e, false);
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:Shuffle.java   
protected MergeManager<K, V> createMergeManager(
    ShuffleConsumerPlugin.Context context) {
  return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(),
      context.getLocalDirAllocator(), reporter, context.getCodec(),
      context.getCombinerClass(), context.getCombineCollector(), 
      context.getSpilledRecordsCounter(),
      context.getReduceCombineInputCounter(),
      context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
      context.getMapOutputFile());
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestShufflePlugin.java   
@Test
/**
 * A testing method instructing core hadoop to load an external ShuffleConsumerPlugin
 * as if it came from a 3rd party.
 */
public void testPluginAbility() {

  try{
    // create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin
    JobConf jobConf = new JobConf();
    jobConf.setClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN,
                     TestShufflePlugin.TestShuffleConsumerPlugin.class,
                     ShuffleConsumerPlugin.class);

    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
    Class<? extends ShuffleConsumerPlugin> clazz =
      jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
    assertNotNull("Unable to get " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, clazz);

    // load 3rd party plugin through core's factory method
    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf);
    assertNotNull("Unable to load " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, shuffleConsumerPlugin);
  }
  catch (Exception e) {
    assertTrue("Threw exception:" + e, false);
  }
}
项目:hadoop-plus    文件:Shuffle.java   
protected MergeManager<K, V> createMergeManager(
    ShuffleConsumerPlugin.Context context) {
  return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(),
      context.getLocalDirAllocator(), reporter, context.getCodec(),
      context.getCombinerClass(), context.getCombineCollector(), 
      context.getSpilledRecordsCounter(),
      context.getReduceCombineInputCounter(),
      context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
      context.getMapOutputFile());
}
项目:hadoop-plus    文件:TestShufflePlugin.java   
@Test
/**
 * A testing method instructing core hadoop to load an external ShuffleConsumerPlugin
 * as if it came from a 3rd party.
 */
public void testPluginAbility() {

  try{
    // create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin
    JobConf jobConf = new JobConf();
    jobConf.setClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN,
                     TestShufflePlugin.TestShuffleConsumerPlugin.class,
                     ShuffleConsumerPlugin.class);

    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
    Class<? extends ShuffleConsumerPlugin> clazz =
      jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
    assertNotNull("Unable to get " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, clazz);

    // load 3rd party plugin through core's factory method
    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf);
    assertNotNull("Unable to load " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, shuffleConsumerPlugin);
  }
  catch (Exception e) {
    assertTrue("Threw exception:" + e, false);
  }
}
项目:FlexMap    文件:Shuffle.java   
protected MergeManager<K, V> createMergeManager(
    ShuffleConsumerPlugin.Context context) {
  return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(),
      context.getLocalDirAllocator(), reporter, context.getCodec(),
      context.getCombinerClass(), context.getCombineCollector(), 
      context.getSpilledRecordsCounter(),
      context.getReduceCombineInputCounter(),
      context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
      context.getMapOutputFile());
}
项目:FlexMap    文件:TestShufflePlugin.java   
@Test
/**
 * A testing method instructing core hadoop to load an external ShuffleConsumerPlugin
 * as if it came from a 3rd party.
 */
public void testPluginAbility() {

  try{
    // create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin
    JobConf jobConf = new JobConf();
    jobConf.setClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN,
                     TestShufflePlugin.TestShuffleConsumerPlugin.class,
                     ShuffleConsumerPlugin.class);

    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
    Class<? extends ShuffleConsumerPlugin> clazz =
      jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
    assertNotNull("Unable to get " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, clazz);

    // load 3rd party plugin through core's factory method
    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf);
    assertNotNull("Unable to load " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, shuffleConsumerPlugin);
  }
  catch (Exception e) {
    assertTrue("Threw exception:" + e, false);
  }
}
项目:hops    文件:Shuffle.java   
protected MergeManager<K, V> createMergeManager(
    ShuffleConsumerPlugin.Context context) {
  return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(),
      context.getLocalDirAllocator(), reporter, context.getCodec(),
      context.getCombinerClass(), context.getCombineCollector(), 
      context.getSpilledRecordsCounter(),
      context.getReduceCombineInputCounter(),
      context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
      context.getMapOutputFile());
}
项目:hops    文件:TestShufflePlugin.java   
@Test
/**
 * A testing method instructing core hadoop to load an external ShuffleConsumerPlugin
 * as if it came from a 3rd party.
 */
public void testPluginAbility() {

  try{
    // create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin
    JobConf jobConf = new JobConf();
    jobConf.setClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN,
                     TestShufflePlugin.TestShuffleConsumerPlugin.class,
                     ShuffleConsumerPlugin.class);

    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
    Class<? extends ShuffleConsumerPlugin> clazz =
      jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
    assertNotNull("Unable to get " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, clazz);

    // load 3rd party plugin through core's factory method
    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf);
    assertNotNull("Unable to load " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, shuffleConsumerPlugin);
  }
  catch (Exception e) {
    assertTrue("Threw exception:" + e, false);
  }
}
项目:hadoop-TCP    文件:Shuffle.java   
protected MergeManager<K, V> createMergeManager(
    ShuffleConsumerPlugin.Context context) {
  return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(),
      context.getLocalDirAllocator(), reporter, context.getCodec(),
      context.getCombinerClass(), context.getCombineCollector(), 
      context.getSpilledRecordsCounter(),
      context.getReduceCombineInputCounter(),
      context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
      context.getMapOutputFile());
}
项目:hadoop-TCP    文件:TestShufflePlugin.java   
@Test
/**
 * A testing method instructing core hadoop to load an external ShuffleConsumerPlugin
 * as if it came from a 3rd party.
 */
public void testPluginAbility() {

  try{
    // create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin
    JobConf jobConf = new JobConf();
    jobConf.setClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN,
                     TestShufflePlugin.TestShuffleConsumerPlugin.class,
                     ShuffleConsumerPlugin.class);

    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
    Class<? extends ShuffleConsumerPlugin> clazz =
      jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
    assertNotNull("Unable to get " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, clazz);

    // load 3rd party plugin through core's factory method
    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf);
    assertNotNull("Unable to load " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, shuffleConsumerPlugin);
  }
  catch (Exception e) {
    assertTrue("Threw exception:" + e, false);
  }
}
项目:hardfs    文件:Shuffle.java   
protected MergeManager<K, V> createMergeManager(
    ShuffleConsumerPlugin.Context context) {
  return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(),
      context.getLocalDirAllocator(), reporter, context.getCodec(),
      context.getCombinerClass(), context.getCombineCollector(), 
      context.getSpilledRecordsCounter(),
      context.getReduceCombineInputCounter(),
      context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
      context.getMapOutputFile());
}
项目:hardfs    文件:TestShufflePlugin.java   
@Test
/**
 * A testing method instructing core hadoop to load an external ShuffleConsumerPlugin
 * as if it came from a 3rd party.
 */
public void testPluginAbility() {

  try{
    // create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin
    JobConf jobConf = new JobConf();
    jobConf.setClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN,
                     TestShufflePlugin.TestShuffleConsumerPlugin.class,
                     ShuffleConsumerPlugin.class);

    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
    Class<? extends ShuffleConsumerPlugin> clazz =
      jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
    assertNotNull("Unable to get " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, clazz);

    // load 3rd party plugin through core's factory method
    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf);
    assertNotNull("Unable to load " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, shuffleConsumerPlugin);
  }
  catch (Exception e) {
    assertTrue("Threw exception:" + e, false);
  }
}
项目:hadoop-on-lustre2    文件:Shuffle.java   
protected MergeManager<K, V> createMergeManager(
    ShuffleConsumerPlugin.Context context) {
  return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(),
      context.getLocalDirAllocator(), reporter, context.getCodec(),
      context.getCombinerClass(), context.getCombineCollector(), 
      context.getSpilledRecordsCounter(),
      context.getReduceCombineInputCounter(),
      context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
      context.getMapOutputFile());
}