Java 类org.apache.hadoop.mapred.join.CompositeInputFormat 实例源码

项目:book-hadoop-hacks    文件:TestMapSide.java   
public int run(String[] args) throws Exception {
    Job job = Job.getInstance(getConf(), "map side join");
    Configuration conf = job.getConfiguration();
    job.setJarByClass(getClass());

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(TupleWritable.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(TupleWritable.class);

    Class<? extends InputFormat> cls = null;
    job.setInputFormatClass(cls);
    // job.setInputFormatClass(CompositeInputFormat.class);
    // 导入路径设置为master和数据两种
    TextInputFormat.addInputPaths(job, args[0]);
    TextInputFormat.addInputPaths(job, args[1]);

    conf.set(CompositeInputFormat.JOIN_EXPR, CompositeInputFormat.compose(
            "inner", KeyValueTextInputFormat.class,
            TextInputFormat.getInputPaths(job)));

    TextOutputFormat.setOutputPath(job, new Path(args[2]));
    job.setOutputFormatClass(TextOutputFormat.class);

    return job.waitForCompletion(true)?0:1;
}
项目:hadoop-map-reduce-patterns    文件:CompositeUserJoin.java   
@Override
public int run(String[] args) throws Exception {
    if (args.length != 4) {
        printUsage();
    }
    Path userPath = new Path(args[0]);
    Path commentPath = new Path(args[1]);
    Path outputDir = new Path(args[2]);
    String joinType = args[3];
    JobConf conf = new JobConf("CompositeJoin");
    conf.setJarByClass(CompositeUserJoin.class);
    conf.setMapperClass(CompositeMapper.class);
    conf.setNumReduceTasks(0);
    // Set the input format class to a CompositeInputFormat class.
    // The CompositeInputFormat will parse all of our input files and output
    // records to our mapper.
    conf.setInputFormat(CompositeInputFormat.class);
    // The composite input format join expression will set how the records
    // are going to be read in, and in what input format.
    conf.set("mapred.join.expr", CompositeInputFormat.compose(joinType,
            KeyValueTextInputFormat.class, userPath, commentPath));
    TextOutputFormat.setOutputPath(conf, outputDir);
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);
    RunningJob job = JobClient.runJob(conf);
    while (!job.isComplete()) {
        Thread.sleep(1000);
    }
    return job.isSuccessful() ? 0 : 1;
}
项目:applications    文件:MatrixMultiplicationCpu.java   
public static Configuration createMatrixMultiplicationCpuConf(
    Configuration initialConf, Path aPath, Path bPath, Path outPath,
    int outCardinality, boolean isDebugging) {

  JobConf conf = new JobConf(initialConf, MatrixMultiplicationCpu.class);
  conf.setJobName("MatrixMultiplicationCPU: " + aPath + " x " + bPath + " = "
      + outPath);

  conf.setInt(CONF_OUT_CARD, outCardinality);
  conf.setBoolean(CONF_DEBUG, isDebugging);

  conf.setInputFormat(CompositeInputFormat.class);
  conf.set("mapred.join.expr", CompositeInputFormat.compose("inner",
      SequenceFileInputFormat.class, aPath, bPath));

  conf.setOutputFormat(SequenceFileOutputFormat.class);
  FileOutputFormat.setOutputPath(conf, outPath);

  conf.setMapperClass(MatrixMultiplyCpuMapper.class);
  conf.setCombinerClass(MatrixMultiplicationCpuReducer.class);
  conf.setReducerClass(MatrixMultiplicationCpuReducer.class);

  conf.setMapOutputKeyClass(IntWritable.class);
  conf.setMapOutputValueClass(VectorWritable.class);

  conf.setOutputKeyClass(IntWritable.class);
  conf.setOutputValueClass(VectorWritable.class);

  // Increase client heap size
  conf.set("mapred.child.java.opts", "-Xms8G -Xmx8G");

  return conf;
}
项目:applications    文件:MatrixMultiplicationGpu.java   
public static Configuration createMatrixMultiplicationGpuConf(
    Configuration initialConf, Path aPath, Path bPath, Path outPath,
    int outCardinality, int tileWidth, boolean isDebugging) {

  JobConf conf = new JobConf(initialConf, MatrixMultiplicationGpu.class);
  conf.setJobName("MatrixMultiplicationGPU: " + aPath + " x " + bPath + " = "
      + outPath);

  conf.setInt(CONF_OUT_CARD, outCardinality);
  conf.setInt(CONF_TILE_WIDTH, tileWidth);
  conf.setBoolean(CONF_DEBUG, isDebugging);

  conf.setInputFormat(CompositeInputFormat.class);
  conf.set("mapred.join.expr", CompositeInputFormat.compose("inner",
      SequenceFileInputFormat.class, aPath, bPath));

  conf.setOutputFormat(SequenceFileOutputFormat.class);
  FileOutputFormat.setOutputPath(conf, outPath);

  conf.setMapperClass(MatrixMultiplyGpuMapper.class);

  conf.setMapOutputKeyClass(IntWritable.class);
  conf.setMapOutputValueClass(VectorWritable.class);

  conf.setOutputKeyClass(IntWritable.class);
  conf.setOutputValueClass(VectorWritable.class);

  // Increase client heap size for GPU Rootbeer execution
  conf.set("mapred.child.java.opts", "-Xms8G -Xmx8G");

  // No Reduce step is needed
  // -> 0 reducer means reduce step will be skipped and
  // mapper output will be the final out
  // -> Identity reducer means then shuffling/sorting will still take place
  conf.setNumReduceTasks(0);

  return conf;
}