Java 类org.apache.hadoop.hbase.mapreduce.PutSortReducer 实例源码

项目:bigdata_pattern    文件:HBaseToHFileMapReduce.java   
/**
 * Sets up the actual job.
 * 
 * @param conf
 *            The current configuration.
 * @param args
 *            The command line parameters.
 * @return The newly created job.
 * @throws IOException
 *             When setting up the job fails.
 */
public static Job createSubmittableJob(Configuration conf, String[] args)
        throws IOException {

    String tableName = args[0];
    Path inputDir = new Path(args[1]);
    Job job = new Job(conf, "HBaseToHFileMapReduce");
    job.setJarByClass(HBaseToHFileMapReduce.class);
    FileInputFormat.setInputPaths(job, inputDir);
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapperClass(HourlyImporter.class);

    if (args.length < 3) {
        // ++++ insert into table directly using TableOutputFormat ++++
        TableMapReduceUtil.initTableReducerJob(tableName, null, job);
        job.setNumReduceTasks(0);
    } else {
        // ++++ to generate HFile instead ++++
        HTable table = new HTable(conf, tableName);
        job.setReducerClass(PutSortReducer.class);
        Path outputDir = new Path(args[2]);
        FileOutputFormat.setOutputPath(job, outputDir);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        HFileOutputFormat.configureIncrementalLoad(job, table);         
    }       

    TableMapReduceUtil.addDependencyJars(job);
    return job;
}
项目:SOAPgaea    文件:LoadVCFToHBase.java   
@Override
public int run(String[] args) throws Exception {
    Configuration conf = HBaseConfiguration.create();

    String[] remainArgs = remainArgs(args, conf);
    options = new LoadVCFToHBaseOptions();
    options.parse(remainArgs);
    options.setHadoopConf(remainArgs, conf);

    conf.addResource(new Path(options.getConfig() + "hbase-site.xml"));
    conf.addResource(new Path(options.getConfig() + "core-site.xml"));
    conf.set("vcfHeader", options.getHeaderOutput());
    Job job = new Job(conf);

    createTable(conf,options.getTableName());

    MultipleVCFHeader vcfHeaders = new MultipleVCFHeader();
    vcfHeaders.mergeHeader(new Path(options.getInput()),options.getHeaderOutput(), job, false);

    job.setJobName("vcf to hbase");
    job.setNumReduceTasks(options.getReducerNumber());
    job.setInputFormatClass(VCFMultipleInputFormat.class);

    job.setJarByClass(LoadVCFToHBase.class);
    job.setMapperClass(VCFToHBaseMapper.class);
    job.setReducerClass(PutSortReducer.class);

    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(Put.class);
    job.setOutputKeyClass(ImmutableBytesWritable.class);
    job.setOutputValueClass(Put.class);

    FileInputFormat.setInputPaths(job, new Path(options.getInput()));
    FileOutputFormat.setOutputPath(job, new Path(options.getHFileOutput()));

    HFileOutputFormat2.configureIncrementalLoad(job, new HTable(conf,options.getTableName()));

    if (job.waitForCompletion(true)) {
        LoadHFile2HBase(conf,options.getTableName(),options.getHFileOutput());
        return 0;
    } else {
        return 1;
    }
}
项目:SOAPgaea    文件:DBNSFPToHbase.java   
@Override
    public int run(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();

        String[] remainArgs = remainArgs(args, conf);
        options = new DBNSFPToHbaseOptions();
        options.parse(remainArgs);
        options.setHadoopConf(remainArgs, conf);
        tableName = TableName.valueOf(options.getTableName());
        conf.set("DEFAULT_COLUMN_FAMILY", "data");
        conf.addResource(new Path(options.getConfig() + "hbase-site.xml"));
        conf.addResource(new Path(options.getConfig() + "core-site.xml"));
        conn = ConnectionFactory.createConnection(conf);

        setHeader(new Path(options.getInput()), conf);
        long reduceThreshMem = (long) (1 << 28);
        conf.setLong("putsortreducer.row.threshold", reduceThreshMem);

        Job job = Job.getInstance(conf, "dbNSFPtoHbase");
        createTable(tableName);

        job.setJarByClass(org.bgi.flexlab.gaea.tools.mapreduce.annotator.databaseload.DBNSFPToHbase.class);
        job.setMapperClass(DBNSFPToHbaseMapper.class);
        job.setReducerClass(PutSortReducer.class);

        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Put.class);

        FileInputFormat.setInputPaths(job, new Path(options.getInput()));
        FileOutputFormat.setOutputPath(job, new Path(options.getHFileOutput()));

//        HFileOutputFormat2.configureIncrementalLoad(job, new HTable(conf,options.getTableName()));
        HFileOutputFormat2.configureIncrementalLoad(job, conn.getTable(tableName), conn.getRegionLocator(tableName));

        if (job.waitForCompletion(true)) {
            LoadHFile2HBase(conf, tableName, options.getHFileOutput());
            conn.close();
            return 0;
        } else {
            conn.close();
            return 1;
        }
    }
项目:HIndex    文件:IndexImportTsv.java   
/**
 * Sets up the actual job.
 * @param conf The current configuration.
 * @param args The command line parameters.
 * @return The newly created job.
 * @throws IOException When setting up the job fails.
 * @throws InterruptedException
 */
public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException,
    ClassNotFoundException {
  HBaseAdmin admin = new IndexAdmin(conf);
  // Support non-XML supported characters
  // by re-encoding the passed separator as a Base64 string.
  String actualSeparator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
  if (actualSeparator != null) {
    conf.set(ImportTsv.SEPARATOR_CONF_KEY, Base64.encodeBytes(actualSeparator.getBytes()));
  }

  // See if a non-default Mapper was set
  String mapperClassName = conf.get(ImportTsv.MAPPER_CONF_KEY);
  Class mapperClass = mapperClassName != null ? Class.forName(mapperClassName) : DEFAULT_MAPPER;

  String tableName = args[0];
  Path inputDir = new Path(args[1]);

  String input = conf.get(IndexUtils.TABLE_INPUT_COLS);
  HTableDescriptor htd = null;
  if (!admin.tableExists(tableName)) {
    htd =
        ImportTsv.prepareHTableDescriptor(tableName, conf.getStrings(ImportTsv.COLUMNS_CONF_KEY));
    if (input != null) {
      htd = IndexUtils.parse(tableName, htd, input, null);
    }
    admin.createTable(htd);
  }

  conf.set(TableInputFormat.INPUT_TABLE, tableName);
  conf.setBoolean(IndexMapReduceUtil.IS_INDEXED_TABLE, input != null);
  Job job = new Job(conf, NAME + "_" + tableName);
  job.setJarByClass(mapperClass);
  FileInputFormat.setInputPaths(job, inputDir);
  job.setInputFormatClass(TextInputFormat.class);
  job.setMapperClass(mapperClass);

  String hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY);
  if (hfileOutPath != null) {
    HTable table = new HTable(conf, tableName);
    job.setReducerClass(PutSortReducer.class);
    Path outputDir = new Path(hfileOutPath);
    FileOutputFormat.setOutputPath(job, outputDir);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(Put.class);
    IndexHFileOutputFormat.configureIncrementalLoad(job, table);
  } else {
    // No reducers. Just write straight to table. Call initTableReducerJob
    // to set up the TableOutputFormat.
    TableMapReduceUtil.initTableReducerJob(tableName, null, job);
    job.setNumReduceTasks(0);
  }

  TableMapReduceUtil.addDependencyJars(job);
  TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
    com.google.common.base.Function.class /* Guava used by TsvParser */);
  return job;
}
项目:hbase-in-action    文件:BulkImportJobExample.java   
public static Job createSubmittableJob(Configuration conf, String[] args)
  throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
  Path inputDir = new Path(args[0]);
  Path outputDir = new Path(args[1]);
  boolean createPartitionFile = Boolean.parseBoolean(args[2]);

  Job job = Job.getInstance(conf,
    "Import delicious RSS feed into Hush tables.");
  job.setJarByClass(BulkImportJobExample.class);

  job.setInputFormatClass(TextInputFormat.class);
  // conf.setLong("hbase.hregion.max.filesize", 64 * 1024);
  FileInputFormat.setInputPaths(job, inputDir);

  job.setMapperClass(BulkImportMapper.class);
  job.setMapOutputKeyClass(ImmutableBytesWritable.class);
  job.setMapOutputValueClass(Put.class);

  job.setPartitionerClass(TotalOrderPartitioner.class);

  job.setReducerClass(PutSortReducer.class);
  job.setOutputKeyClass(ImmutableBytesWritable.class);
  job.setOutputValueClass(KeyValue.class);

  job.setOutputFormatClass(HFileOutputFormat.class);
  HFileOutputFormat.setOutputPath(job, outputDir);

  HFileOutputFormat.setCompressOutput(job, true);
  HFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
  job.getConfiguration().set("hfile.compression", "gz");

  //job.getConfiguration().setFloat("mapred.job.shuffle.input.buffer.percent", 0.5f);
  //job.setNumReduceTasks(30);

  Path partitionsPath = new Path(job.getWorkingDirectory(),
    "partitions_" + System.currentTimeMillis());
  TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath);

  if (createPartitionFile) {
    VerboseInputSampler.Sampler<KeyValue, ImmutableBytesWritable> sampler =
      new VerboseInputSampler.VerboseRandomSampler<KeyValue, ImmutableBytesWritable>(0.05, 1000000, 30);       // use 0.1 for real sampling

    LOG.info("Sampling key space");
    VerboseInputSampler.writePartitionFile(job, sampler);
    LOG.info("Samping done");
  }

  URI cacheUri = new URI(partitionsPath.toString() + "#" +
    TotalOrderPartitioner.DEFAULT_PATH);
  DistributedCache.addCacheFile(cacheUri, job.getConfiguration());
  DistributedCache.createSymlink(job.getConfiguration());

  return job;
}
项目:kylin    文件:HFileOutputFormat3.java   
static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
                                     RegionLocator regionLocator, Class<? extends OutputFormat<?, ?>> cls) throws IOException,
        UnsupportedEncodingException {
    Configuration conf = job.getConfiguration();
    job.setOutputKeyClass(ImmutableBytesWritable.class);
    job.setOutputValueClass(KeyValue.class);
    job.setOutputFormatClass(cls);

    // Based on the configured map output class, set the correct reducer to properly
    // sort the incoming values.
    // TODO it would be nice to pick one or the other of these formats.
    if (KeyValue.class.equals(job.getMapOutputValueClass())) {
        job.setReducerClass(KeyValueSortReducer.class);
    } else if (Put.class.equals(job.getMapOutputValueClass())) {
        job.setReducerClass(PutSortReducer.class);
    } else if (Text.class.equals(job.getMapOutputValueClass())) {
        job.setReducerClass(TextSortReducer.class);
    } else {
        LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
    }

    conf.setStrings("io.serializations", conf.get("io.serializations"),
            MutationSerialization.class.getName(), ResultSerialization.class.getName(),
            KeyValueSerialization.class.getName());

    // Use table's region boundaries for TOP split points.
    LOG.info("Looking up current regions for table " + tableDescriptor.getTableName());
    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);
    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
            "to match current region count");
    job.setNumReduceTasks(startKeys.size());

    configurePartitioner(job, startKeys);
    // Set compression algorithms based on column families
    configureCompression(conf, tableDescriptor);
    configureBloomType(tableDescriptor, conf);
    configureBlockSize(tableDescriptor, conf);
    configureDataBlockEncoding(tableDescriptor, conf);

    TableMapReduceUtil.addDependencyJars(job);
    TableMapReduceUtil.initCredentials(job);
    LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
}
项目:learning-hadoop    文件:ImportTsv.java   
/**
 * Sets up the actual job.
 * 
 * @param conf
 *            The current configuration.
 * @param args
 *            The command line parameters.
 * @return The newly created job.
 * @throws IOException
 *             When setting up the job fails.
 */
public static Job createSubmittableJob(Configuration conf, String[] args)
        throws IOException, ClassNotFoundException {

    // Support non-XML supported characters
    // by re-encoding the passed separator as a Base64 string.
    String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
    if (actualSeparator != null) {
        conf.set(SEPARATOR_CONF_KEY,
                new String(Base64.encodeBytes(actualSeparator.getBytes())));
    }

    // See if a non-default Mapper was set
    String mapperClassName = conf.get(MAPPER_CONF_KEY);
    Class mapperClass = mapperClassName != null ? Class
            .forName(mapperClassName) : DEFAULT_MAPPER;

    String tableName = args[0];
    Path inputDir = new Path(args[1]);
    Job job = new Job(conf, NAME + "_" + tableName);
    job.setJarByClass(mapperClass);
    FileInputFormat.setInputPaths(job, inputDir);

    String inputCodec = conf.get(INPUT_LZO_KEY);
    if (inputCodec == null) {
        FileInputFormat.setMaxInputSplitSize(job, 67108864l); // max split
                                                                // size =
                                                                // 64m
        job.setInputFormatClass(TextInputFormat.class);
    } else {
        if (inputCodec.equalsIgnoreCase("lzo"))
            job.setInputFormatClass(LzoTextInputFormat.class);
        else {
            usage("not supported compression codec!");
            System.exit(-1);
        }
    }

    job.setMapperClass(mapperClass);

    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
    if (hfileOutPath != null) {
        HTable table = new HTable(conf, tableName);
        job.setReducerClass(PutSortReducer.class);
        Path outputDir = new Path(hfileOutPath);
        FileOutputFormat.setOutputPath(job, outputDir);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        HFileOutputFormat.configureIncrementalLoad(job, table);
    } else {
        // No reducers. Just write straight to table. Call
        // initTableReducerJob
        // to set up the TableOutputFormat.
        TableMapReduceUtil.initTableReducerJob(tableName, null, job);
        job.setNumReduceTasks(0);
    }

    TableMapReduceUtil.addDependencyJars(job);
    TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
            com.google.common.base.Function.class /*
                                                 * Guava used by TsvParser
                                                 */);
    return job;
}
项目:hindex    文件:IndexImportTsv.java   
/**
 * Sets up the actual job.
 * @param conf The current configuration.
 * @param args The command line parameters.
 * @return The newly created job.
 * @throws IOException When setting up the job fails.
 * @throws InterruptedException
 */
public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException,
    ClassNotFoundException, InterruptedException {

  // Support non-XML supported characters
  // by re-encoding the passed separator as a Base64 string.
  String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
  if (actualSeparator != null) {
    conf.set(SEPARATOR_CONF_KEY, Base64.encodeBytes(actualSeparator.getBytes()));
  }

  // See if a non-default Mapper was set
  String mapperClassName = conf.get(MAPPER_CONF_KEY);
  Class mapperClass = mapperClassName != null ? Class.forName(mapperClassName) : DEFAULT_MAPPER;

  String tableName = args[0];
  Path inputDir = new Path(args[1]);

  String input = conf.get(IndexUtils.TABLE_INPUT_COLS);
  if (!doesTableExist(tableName)) {
    createTable(conf, tableName);
    if (input != null) {
      IndexUtils.createIndexTable(tableName, conf, null);
    }
  }

  conf.set(TableInputFormat.INPUT_TABLE, tableName);
  boolean indexedTable = IndexMapReduceUtil.isIndexedTable(conf);
  conf.setBoolean(IndexMapReduceUtil.INDEX_IS_INDEXED_TABLE, indexedTable);
  Job job = new Job(conf, NAME + "_" + tableName);
  job.setJarByClass(mapperClass);
  FileInputFormat.setInputPaths(job, inputDir);
  job.setInputFormatClass(TextInputFormat.class);
  job.setMapperClass(mapperClass);

  String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
  if (hfileOutPath != null) {
    HTable table = new HTable(conf, tableName);
    job.setReducerClass(PutSortReducer.class);
    Path outputDir = new Path(hfileOutPath);
    FileOutputFormat.setOutputPath(job, outputDir);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(Put.class);
    IndexHFileOutputFormat.configureIncrementalLoad(job, table);
  } else {
    // No reducers. Just write straight to table. Call initTableReducerJob
    // to set up the TableOutputFormat.
    TableMapReduceUtil.initTableReducerJob(tableName, null, job);
    job.setNumReduceTasks(0);
  }

  TableMapReduceUtil.addDependencyJars(job);
  TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
    com.google.common.base.Function.class /* Guava used by TsvParser */);
  return job;
}