Java 类org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat 实例源码

项目:cs433    文件:JobSubmitter.java   
public void run() throws Exception {
    String tableName = "contacts";

    Configuration config = HBaseConfiguration.create();

    Scan scan = new Scan();
    scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
    scan.setCacheBlocks(false);  // don't set to true for MR jobs

    config.set(TableInputFormat.SCAN, convertScanToString(scan));
    config.set(TableInputFormat.INPUT_TABLE, tableName);

    Job job = new Job(config, "index builder");
    job.setJarByClass(JobSubmitter.class);
    job.setMapperClass(IndexMapper.class);
    job.setNumReduceTasks(0);
    job.setInputFormatClass(TableInputFormat.class);
    job.setOutputFormatClass(MultiTableOutputFormat.class);

    boolean b = job.waitForCompletion(true);
    if (!b) {
        throw new IOException("error with job!");
    }
}
项目:hraven    文件:JobFileProcessor.java   
/**
 * @param conf to use to create and run the job
 * @param scan to be used to scan the raw table.
 * @param totalJobCount the total number of jobs that need to be run in this
 *          batch. Used in job name.
 * @return The job to be submitted to the cluster.
 * @throws IOException
 * @throws InterruptedException
 * @throws ClassNotFoundException
 */
private Job getProcessingJob(Configuration conf, Scan scan, int totalJobCount)
    throws IOException {

  Configuration confClone = new Configuration(conf);

  // Turn off speculative execution.
  // Note: must be BEFORE the job construction with the new mapreduce API.
  confClone.setBoolean("mapred.map.tasks.speculative.execution", false);

  // Set up job
  Job job = new Job(confClone, getJobName(totalJobCount));

  // This is a map-only class, skip reduce step
  job.setNumReduceTasks(0);
  job.setJarByClass(JobFileProcessor.class);
  job.setOutputFormatClass(MultiTableOutputFormat.class);

  TableMapReduceUtil.initTableMapperJob(Constants.HISTORY_RAW_TABLE, scan,
      JobFileTableMapper.class, JobFileTableMapper.getOutputKeyClass(),
      JobFileTableMapper.getOutputValueClass(), job);

  return job;
}
项目:yuzhouwan    文件:TwoLevelIndexBuilder.java   
public static void main(String[] args) throws Exception {

        String rootDir = "hdfs://hadoop1:8020/hbase";
        String zkServer = "hadoop1";
        String port = "2181";

        TwoLevelIndexBuilder conn = new TwoLevelIndexBuilder(rootDir, zkServer, port);

        Configuration conf = conn.conf;
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        //TwoLevelIndexBuilder: TableName, ColumnFamily, Qualifier
        if (otherArgs.length < 3) {
            System.exit(-1);
        }
        //表名
        String tableName = otherArgs[0];
        //列族
        String columnFamily = otherArgs[1];

        conf.set("tableName", tableName);
        conf.set("columnFamily", columnFamily);

        //列 (可能存在多个列)
        String[] qualifiers = new String[otherArgs.length - 2];
        System.arraycopy(otherArgs, 2, qualifiers, 0, qualifiers.length);

        //设置列
        conf.setStrings("qualifiers", qualifiers);

        Job job = new Job(conf, tableName);

        job.setJarByClass(TwoLevelIndexBuilder.class);
        job.setMapperClass(TowLevelIndexMapper.class);
        job.setNumReduceTasks(0);       //由于不需要执行 reduce阶段
        job.setInputFormatClass(TableInputFormat.class);
        job.setOutputFormatClass(MultiTableOutputFormat.class);
        TableMapReduceUtil.initTableMapperJob(tableName, new Scan(),
                TowLevelIndexMapper.class, ImmutableBytesWritable.class, Put.class, job);

        job.waitForCompletion(true);
    }