Java 类org.apache.hadoop.mapred.Reducer 实例源码

项目:aliyun-oss-hadoop-fs    文件:MergeManagerImpl.java   
private void combineAndSpill(
    RawKeyValueIterator kvIter,
    Counters.Counter inCounter) throws IOException {
  JobConf job = jobConf;
  Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
  Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
  Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
  RawComparator<K> comparator = 
    (RawComparator<K>)job.getCombinerKeyGroupingComparator();
  try {
    CombineValuesIterator values = new CombineValuesIterator(
        kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
        inCounter);
    while (values.more()) {
      combiner.reduce(values.getKey(), values, combineCollector,
                      Reporter.NULL);
      values.nextKey();
    }
  } finally {
    combiner.close();
  }
}
项目:big-c    文件:MergeManagerImpl.java   
private void combineAndSpill(
    RawKeyValueIterator kvIter,
    Counters.Counter inCounter) throws IOException {
  JobConf job = jobConf;
  Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
  Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
  Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
  RawComparator<K> comparator = 
    (RawComparator<K>)job.getCombinerKeyGroupingComparator();
  try {
    CombineValuesIterator values = new CombineValuesIterator(
        kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
        inCounter);
    while (values.more()) {
      combiner.reduce(values.getKey(), values, combineCollector,
                      Reporter.NULL);
      values.nextKey();
    }
  } finally {
    combiner.close();
  }
}
项目:flink    文件:HadoopReduceCombineFunction.java   
/**
 * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
 *
 * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
 * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
 * @param conf The JobConf that is used to configure both Hadoop Reducers.
 */
public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
                            Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner, JobConf conf) {
    if (hadoopReducer == null) {
        throw new NullPointerException("Reducer may not be null.");
    }
    if (hadoopCombiner == null) {
        throw new NullPointerException("Combiner may not be null.");
    }
    if (conf == null) {
        throw new NullPointerException("JobConf may not be null.");
    }

    this.reducer = hadoopReducer;
    this.combiner = hadoopCombiner;
    this.jobConf = conf;
}
项目:flink    文件:HadoopReduceCombineFunction.java   
/**
 * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
 * 
 * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
 * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
 * @param conf The JobConf that is used to configure both Hadoop Reducers.
 */
public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
                            Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) {
    if(hadoopReducer == null) {
        throw new NullPointerException("Reducer may not be null.");
    }
    if(hadoopCombiner == null) {
        throw new NullPointerException("Combiner may not be null.");
    }
    if(conf == null) {
        throw new NullPointerException("JobConf may not be null.");
    }

    this.reducer = hadoopReducer;
    this.combiner = hadoopCombiner;
    this.jobConf = conf;
}
项目:flink    文件:HadoopReduceFunction.java   
@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    this.reducer.configure(jobConf);

    this.reporter = new HadoopDummyReporter();
    this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
    Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
    TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
    this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(keySerializer);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:MergeManagerImpl.java   
private void combineAndSpill(
    RawKeyValueIterator kvIter,
    Counters.Counter inCounter) throws IOException {
  JobConf job = jobConf;
  Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
  Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
  Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
  RawComparator<K> comparator = 
    (RawComparator<K>)job.getCombinerKeyGroupingComparator();
  try {
    CombineValuesIterator values = new CombineValuesIterator(
        kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
        inCounter);
    while (values.more()) {
      combiner.reduce(values.getKey(), values, combineCollector,
                      Reporter.NULL);
      values.nextKey();
    }
  } finally {
    combiner.close();
  }
}
项目:hadoop-EAR    文件:PipeReducer.java   
public void configure(JobConf job) {
    super.configure(job);
    Class<?> c = job.getClass("stream.reduce.posthook", null, Mapper.class);
    if(c != null) {
        postMapper = (Mapper)ReflectionUtils.newInstance(c, job);
        LOG.info("PostHook="+c.getName());
    }

    c = job.getClass("stream.reduce.prehook", null, Reducer.class);
    if(c != null) {
        preReducer = (Reducer)ReflectionUtils.newInstance(c, job);
        oc = new InmemBufferingOutputCollector();
        LOG.info("PreHook="+c.getName());
    }
    this.ignoreKey = job.getBoolean("stream.reduce.ignoreKey", false);
}
项目:hadoop-plus    文件:MergeManagerImpl.java   
private void combineAndSpill(
    RawKeyValueIterator kvIter,
    Counters.Counter inCounter) throws IOException {
  JobConf job = jobConf;
  Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
  Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
  Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
  RawComparator<K> comparator = 
    (RawComparator<K>)job.getOutputKeyComparator();
  try {
    CombineValuesIterator values = new CombineValuesIterator(
        kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
        inCounter);
    while (values.more()) {
      combiner.reduce(values.getKey(), values, combineCollector,
                      Reporter.NULL);
      values.nextKey();
    }
  } finally {
    combiner.close();
  }
}
项目:FlexMap    文件:MergeManagerImpl.java   
private void combineAndSpill(
    RawKeyValueIterator kvIter,
    Counters.Counter inCounter) throws IOException {
  JobConf job = jobConf;
  Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
  Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
  Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
  RawComparator<K> comparator = 
    (RawComparator<K>)job.getCombinerKeyGroupingComparator();
  try {
    CombineValuesIterator values = new CombineValuesIterator(
        kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
        inCounter);
    while (values.more()) {
      combiner.reduce(values.getKey(), values, combineCollector,
                      Reporter.NULL);
      values.nextKey();
    }
  } finally {
    combiner.close();
  }
}
项目:hops    文件:MergeManagerImpl.java   
private void combineAndSpill(
    RawKeyValueIterator kvIter,
    Counters.Counter inCounter) throws IOException {
  JobConf job = jobConf;
  Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
  Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
  Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
  RawComparator<K> comparator = 
    (RawComparator<K>)job.getCombinerKeyGroupingComparator();
  try {
    CombineValuesIterator values = new CombineValuesIterator(
        kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
        inCounter);
    while (values.more()) {
      combiner.reduce(values.getKey(), values, combineCollector,
                      Reporter.NULL);
      values.nextKey();
    }
  } finally {
    combiner.close();
  }
}
项目:hadoop-TCP    文件:MergeManagerImpl.java   
private void combineAndSpill(
    RawKeyValueIterator kvIter,
    Counters.Counter inCounter) throws IOException {
  JobConf job = jobConf;
  Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
  Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
  Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
  RawComparator<K> comparator = 
    (RawComparator<K>)job.getOutputKeyComparator();
  try {
    CombineValuesIterator values = new CombineValuesIterator(
        kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
        inCounter);
    while (values.more()) {
      combiner.reduce(values.getKey(), values, combineCollector,
                      Reporter.NULL);
      values.nextKey();
    }
  } finally {
    combiner.close();
  }
}
项目:hardfs    文件:MergeManagerImpl.java   
private void combineAndSpill(
    RawKeyValueIterator kvIter,
    Counters.Counter inCounter) throws IOException {
  JobConf job = jobConf;
  Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
  Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
  Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
  RawComparator<K> comparator = 
    (RawComparator<K>)job.getOutputKeyComparator();
  try {
    CombineValuesIterator values = new CombineValuesIterator(
        kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
        inCounter);
    while (values.more()) {
      combiner.reduce(values.getKey(), values, combineCollector,
                      Reporter.NULL);
      values.nextKey();
    }
  } finally {
    combiner.close();
  }
}
项目:hadoop-on-lustre2    文件:MergeManagerImpl.java   
private void combineAndSpill(
    RawKeyValueIterator kvIter,
    Counters.Counter inCounter) throws IOException {
  JobConf job = jobConf;
  Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
  Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
  Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
  RawComparator<K> comparator = 
    (RawComparator<K>)job.getCombinerKeyGroupingComparator();
  try {
    CombineValuesIterator values = new CombineValuesIterator(
        kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
        inCounter);
    while (values.more()) {
      combiner.reduce(values.getKey(), values, combineCollector,
                      Reporter.NULL);
      values.nextKey();
    }
  } finally {
    combiner.close();
  }
}
项目:RDFS    文件:PipeReducer.java   
public void configure(JobConf job) {
    super.configure(job);
    Class<?> c = job.getClass("stream.reduce.posthook", null, Mapper.class);
    if(c != null) {
        postMapper = (Mapper)ReflectionUtils.newInstance(c, job);
        LOG.info("PostHook="+c.getName());
    }

    c = job.getClass("stream.reduce.prehook", null, Reducer.class);
    if(c != null) {
        preReducer = (Reducer)ReflectionUtils.newInstance(c, job);
        oc = new InmemBufferingOutputCollector();
        LOG.info("PreHook="+c.getName());
    }
    this.ignoreKey = job.getBoolean("stream.reduce.ignoreKey", false);
}
项目:vs.msc.ws14    文件:HadoopReduceCombineFunction.java   
/**
 * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
 * 
 * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
 * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
 * @param conf The JobConf that is used to configure both Hadoop Reducers.
 */
public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
                            Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) {
    if(hadoopReducer == null) {
        throw new NullPointerException("Reducer may not be null.");
    }
    if(hadoopCombiner == null) {
        throw new NullPointerException("Combiner may not be null.");
    }
    if(conf == null) {
        throw new NullPointerException("JobConf may not be null.");
    }

    this.reducer = hadoopReducer;
    this.combiner = hadoopCombiner;
    this.jobConf = conf;
}
项目:incubator-tez    文件:MRCombiner.java   
private void runOldCombiner(final TezRawKeyValueIterator rawIter, final Writer writer) throws IOException {
  Class<? extends Reducer> reducerClazz = (Class<? extends Reducer>) conf.getClass("mapred.combiner.class", null, Reducer.class);

  Reducer combiner = ReflectionUtils.newInstance(reducerClazz, conf);

  OutputCollector collector = new OutputCollector() {
    @Override
    public void collect(Object key, Object value) throws IOException {
      writer.append(key, value);
    }
  };

  CombinerValuesIterator values = new CombinerValuesIterator(rawIter, keyClass, valClass, comparator);

  while (values.moveToNext()) {
    combiner.reduce(values.getKey(), values.getValues().iterator(), collector, reporter);
  }
}
项目:mapreduce-fork    文件:MergeManager.java   
private void combineAndSpill(
    RawKeyValueIterator kvIter,
    Counters.Counter inCounter) throws IOException {
  JobConf job = jobConf;
  Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
  Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
  Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
  RawComparator<K> comparator = 
    (RawComparator<K>)job.getOutputKeyComparator();
  try {
    CombineValuesIterator values = new CombineValuesIterator(
        kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
        inCounter);
    while (values.more()) {
      combiner.reduce(values.getKey(), values, combineCollector,
                      Reporter.NULL);
      values.nextKey();
    }
  } finally {
    combiner.close();
  }
}
项目:tez    文件:MRCombiner.java   
private void runOldCombiner(final TezRawKeyValueIterator rawIter, final Writer writer) throws IOException {
  Class<? extends Reducer> reducerClazz = (Class<? extends Reducer>) conf.getClass("mapred.combiner.class", null, Reducer.class);

  Reducer combiner = ReflectionUtils.newInstance(reducerClazz, conf);

  OutputCollector collector = new OutputCollector() {
    @Override
    public void collect(Object key, Object value) throws IOException {
      writer.append(key, value);
      combineOutputRecordsCounter.increment(1);
    }
  };

  CombinerValuesIterator values = new CombinerValuesIterator(rawIter, keyClass, valClass, comparator);

  while (values.moveToNext()) {
    combiner.reduce(values.getKey(), values.getValues().iterator(), collector, reporter);
  }
}
项目:hadoop    文件:MergeManagerImpl.java   
private void combineAndSpill(
    RawKeyValueIterator kvIter,
    Counters.Counter inCounter) throws IOException {
  JobConf job = jobConf;
  Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
  Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
  Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
  RawComparator<K> comparator = 
    (RawComparator<K>)job.getCombinerKeyGroupingComparator();
  try {
    CombineValuesIterator values = new CombineValuesIterator(
        kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
        inCounter);
    while (values.more()) {
      combiner.reduce(values.getKey(), values, combineCollector,
                      Reporter.NULL);
      values.nextKey();
    }
  } finally {
    combiner.close();
  }
}
项目:flink    文件:HadoopReduceCombineFunction.java   
@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    this.reducer.configure(jobConf);
    this.combiner.configure(jobConf);

    this.reporter = new HadoopDummyReporter();
    Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
    TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
    this.valueIterator = new HadoopTupleUnwrappingIterator<>(keySerializer);
    this.combineCollector = new HadoopOutputCollector<>();
    this.reduceCollector = new HadoopOutputCollector<>();
}
项目:flink    文件:HadoopReduceCombineFunction.java   
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() {
    Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
    Class<VALUEOUT> outValClass = (Class<VALUEOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);

    final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass(outKeyClass);
    final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass(outValClass);
    return new TupleTypeInfo<>(keyTypeInfo, valueTypleInfo);
}
项目:flink    文件:HadoopReduceCombineFunction.java   
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {

    Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> reducerClass =
            (Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>) in.readObject();
    reducer = InstantiationUtil.instantiate(reducerClass);

    Class<Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN>> combinerClass =
            (Class<Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN>>) in.readObject();
    combiner = InstantiationUtil.instantiate(combinerClass);

    jobConf = new JobConf();
    jobConf.readFields(in);
}
项目:flink    文件:HadoopReduceFunction.java   
/**
 * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
     *
 * @param hadoopReducer The Hadoop Reducer to wrap.
 * @param conf The JobConf that is used to configure the Hadoop Reducer.
 */
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) {
    if (hadoopReducer == null) {
        throw new NullPointerException("Reducer may not be null.");
    }
    if (conf == null) {
        throw new NullPointerException("JobConf may not be null.");
    }

    this.reducer = hadoopReducer;
    this.jobConf = conf;
}
项目:flink    文件:HadoopReduceFunction.java   
@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    this.reducer.configure(jobConf);

    this.reporter = new HadoopDummyReporter();
    this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
    Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
    TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
    this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(keySerializer);
}
项目:flink    文件:HadoopReduceFunction.java   
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() {
    Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
    Class<VALUEOUT> outValClass = (Class<VALUEOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);

    final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
    final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
    return new TupleTypeInfo<Tuple2<KEYOUT, VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
项目:flink    文件:HadoopReduceFunction.java   
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {

    Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> reducerClass =
            (Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>) in.readObject();
    reducer = InstantiationUtil.instantiate(reducerClass);

    jobConf = new JobConf();
    jobConf.readFields(in);
}
项目:flink    文件:HadoopReduceCombineFunction.java   
@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    this.reducer.configure(jobConf);
    this.combiner.configure(jobConf);

    this.reporter = new HadoopDummyReporter();
    Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
    TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
    this.valueIterator = new HadoopTupleUnwrappingIterator<>(keySerializer);
    this.combineCollector = new HadoopOutputCollector<>();
    this.reduceCollector = new HadoopOutputCollector<>();
}
项目:flink    文件:HadoopReduceCombineFunction.java   
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
    Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
    Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);

    final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass(outKeyClass);
    final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass(outValClass);
    return new TupleTypeInfo<>(keyTypeInfo, valueTypleInfo);
}
项目:flink    文件:HadoopReduceCombineFunction.java   
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {

    Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
            (Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
    reducer = InstantiationUtil.instantiate(reducerClass);

    Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass = 
            (Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject();
    combiner = InstantiationUtil.instantiate(combinerClass);

    jobConf = new JobConf();
    jobConf.readFields(in);
}
项目:flink    文件:HadoopReduceFunction.java   
/**
 * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
     * 
 * @param hadoopReducer The Hadoop Reducer to wrap.
 * @param conf The JobConf that is used to configure the Hadoop Reducer.
 */
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) {
    if(hadoopReducer == null) {
        throw new NullPointerException("Reducer may not be null.");
    }
    if(conf == null) {
        throw new NullPointerException("JobConf may not be null.");
    }

    this.reducer = hadoopReducer;
    this.jobConf = conf;
}
项目:flink    文件:HadoopReduceFunction.java   
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
    Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
    Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);

    final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
    final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
    return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
项目:flink    文件:HadoopReduceFunction.java   
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {

    Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
            (Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
    reducer = InstantiationUtil.instantiate(reducerClass);

    jobConf = new JobConf();
    jobConf.readFields(in);
}
项目:hiped2    文件:PipelineTest.java   
@Before
public void setUp() {
  mapper1 = new IdentityMapper<Text, Text>();
  reducer1 = new IdentityReducer<Text, Text>();
  mapper2 = new IdentityMapper<Text, Text>();
  reducer2 = new IdentityReducer<Text, Text>();
  driver = new PipelineMapReduceDriver<Text, Text, Text, Text>();
  driver.addMapReduce(new Pair<Mapper, Reducer>(mapper1, reducer1));
  driver.addMapReduce(new Pair<Mapper, Reducer>(mapper2, reducer2));
}
项目:incubator-asterixdb-hyracks    文件:HadoopReducerOperatorDescriptor.java   
@SuppressWarnings("unchecked")
ReducerContext(org.apache.hadoop.mapreduce.Reducer reducer, JobConf conf) throws IOException,
        InterruptedException, ClassNotFoundException {
    ((org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer) reducer).super(new MRContextUtil()
            .createReduceContext(conf, new TaskAttemptID(), rawKeyValueIterator, null, null, null, null,
                    null, null, Class.forName("org.apache.hadoop.io.NullWritable"),
                    Class.forName("org.apache.hadoop.io.NullWritable")));
}
项目:incubator-asterixdb-hyracks    文件:HadoopReducerOperatorDescriptor.java   
@Override
public void close() throws HyracksDataException {
    // -- - close - --
    try {
        if (!jobConf.getUseNewMapper()) {
            ((org.apache.hadoop.mapred.Reducer) reducer).close();
        }
    } catch (IOException e) {
        throw new HyracksDataException(e);
    }
}
项目:vs.msc.ws14    文件:HadoopReduceCombineFunction.java   
@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    this.reducer.configure(jobConf);
    this.combiner.configure(jobConf);

    this.reporter = new HadoopDummyReporter();
    Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
    this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(inKeyClass);
    this.combineCollector = new HadoopOutputCollector<KEYIN, VALUEIN>();
    this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
}
项目:vs.msc.ws14    文件:HadoopReduceCombineFunction.java   
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
    Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
    Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);

    final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass);
    final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass);
    return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
项目:vs.msc.ws14    文件:HadoopReduceCombineFunction.java   
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {

    Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
            (Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
    reducer = InstantiationUtil.instantiate(reducerClass);

    Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass = 
            (Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject();
    combiner = InstantiationUtil.instantiate(combinerClass);

    jobConf = new JobConf();
    jobConf.readFields(in);
}
项目:vs.msc.ws14    文件:HadoopReduceFunction.java   
/**
 * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
     * 
 * @param hadoopReducer The Hadoop Reducer to wrap.
 * @param conf The JobConf that is used to configure the Hadoop Reducer.
 */
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) {
    if(hadoopReducer == null) {
        throw new NullPointerException("Reducer may not be null.");
    }
    if(conf == null) {
        throw new NullPointerException("JobConf may not be null.");
    }

    this.reducer = hadoopReducer;
    this.jobConf = conf;
}
项目:vs.msc.ws14    文件:HadoopReduceFunction.java   
@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    this.reducer.configure(jobConf);

    this.reporter = new HadoopDummyReporter();
    this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
    Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
    this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(inKeyClass);
}