Java 类org.apache.hadoop.hbase.mapreduce.ImportTsv 实例源码

项目:HIndex    文件:IndexTsvImporterMapper.java   
/**
 * Handles initializing this class with objects specific to it (i.e., the parser). Common
 * initialization that might be leveraged by a subsclass is done in <code>doSetup</code>. Hence a
 * subclass may choose to override this method and call <code>doSetup</code> as well before
 * handling it's own custom params.
 * @param context
 */
@Override
protected void setup(Context context) throws IOException {
  doSetup(context);

  Configuration conf = context.getConfiguration();

  parser = new TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
  if (parser.getRowKeyColumnIndex() == -1) {
    throw new RuntimeException("No row key column specified");
  }
  String tableName = conf.get(TableInputFormat.INPUT_TABLE);
  HTable hTable = null;
  try {
    hTable = new HTable(conf, tableName);
    this.startKeys = hTable.getStartKeys();
    byte[] indexBytes = hTable.getTableDescriptor().getValue(Constants.INDEX_SPEC_KEY);
    if (indexBytes != null) {
      TableIndices tableIndices = new TableIndices();
      tableIndices.readFields(indexBytes);
      this.indices = tableIndices.getIndices();
    }
  } finally {
    if (hTable != null) hTable.close();
  }
}
项目:HIndex    文件:IndexTsvImporterMapper.java   
/**
 * Handles common parameter initialization that a subclass might want to leverage.
 * @param context
 */
protected void doSetup(Context context) {
  Configuration conf = context.getConfiguration();

  // If a custom separator has been used,
  // decode it back from Base64 encoding.
  separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
  if (separator == null) {
    separator = ImportTsv.DEFAULT_SEPARATOR;
  } else {
    separator = new String(Base64.decode(separator));
  }

  // Should never get 0 as we are setting this to a valid value in job configuration.
  ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);

  skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
  badLineCount = context.getCounter("ImportTsv", "Bad Lines");
  hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY);
  indexedTable = conf.getBoolean(IndexMapReduceUtil.IS_INDEXED_TABLE, false);
}
项目:HIndex    文件:TestIndexImportTsv.java   
@Test
public void testTsvParser() throws BadTsvLineException {
  TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d", "\t");
  assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0));
  assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0));
  assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1));
  assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1));
  assertNull(parser.getFamily(2));
  assertNull(parser.getQualifier(2));
  assertEquals(2, parser.getRowKeyColumnIndex());

  assertEquals(ImportTsv.TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX,
    parser.getTimestampKeyColumnIndex());

  byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d");
  ParsedLine parsed = parser.parse(line, line.length);
  checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
}
项目:HIndex    文件:TestIndexImportTsv.java   
@Test
public void testMROnTable() throws Exception {
  String TABLE_NAME = "testMROnTable";
  String FAMILY = "FAM";
  String INPUT_FILE = "InputFile.esv";

  // Prepare the arguments required for the test.
  String[] args =
      new String[] { "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
          "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", TABLE_NAME, INPUT_FILE };

  doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 1);
}
项目:HIndex    文件:TestIndexImportTsv.java   
@Test
public void testMROnTableWithTimestamp() throws Exception {
  String TABLE_NAME = "testMROnTableWithTimestamp";
  String FAMILY = "FAM";
  String INPUT_FILE = "InputFile1.csv";

  // Prepare the arguments required for the test.
  String[] args =
      new String[] {
          "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B",
          "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", TABLE_NAME, INPUT_FILE };

  String data = "KEY,1234,VALUE1,VALUE2\n";
  doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, data, args, 1);
}
项目:HIndex    文件:TestIndexImportTsv.java   
@Test
public void testBulkOutputWithoutAnExistingTable() throws Exception {
  String TABLE_NAME = "testBulkOutputWithoutAnExistingTable";
  String FAMILY = "FAM";
  String INPUT_FILE = "InputFile2.esv";

  // Prepare the arguments required for the test.
  String[] args =
      new String[] { "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
          "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
          "-Dtable.columns.index=IDX1=>FAM:[A->String&10]",TABLE_NAME, INPUT_FILE };
  doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 1);
}
项目:HIndex    文件:TestIndexImportTsv.java   
@Test
public void testIndexBulkLoad() throws Exception {
  String TABLE_NAME = "testIndexBulkLoad";
  String FAMILY = "FAM";
  String INPUT_FILE = "InputFile2.esv";

  // Prepare the arguments required for the test.
  String[] args =
      new String[] { "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
          "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
          "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=output",
          "-Dtable.columns.index=IDX1=>FAM:[A->String&10]",TABLE_NAME, INPUT_FILE };
  doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 1);
}
项目:HGraph    文件:AbstractHBaseMiniClusterTest.java   
/**
     * Import local file to table.
     * @param conf
     * @param args
     * @param TABLE_NAME
     * @param INPUT_FILE
     * @throws IOException
     * @throws InterruptedException
     * @throws ClassNotFoundException
     */
    protected static void importLocalFile2Table(
            final Configuration conf, final String[] args, final String TABLE_NAME, 
            final String INPUT_FILE)
            throws IOException, InterruptedException, ClassNotFoundException {
        Validate.notEmpty(INPUT_FILE, "INPUT_FILE shall not be empty or null");

        InputStream ips = ClassLoader.getSystemResourceAsStream(INPUT_FILE);
        assertNotNull(ips);

        FileSystem fs = FileSystem.get(conf);
        FSDataOutputStream op = fs.create(new Path(INPUT_FILE), true);
        IOUtils.write(IOUtils.toString(ips), op, HConstants.UTF8_ENCODING);
        IOUtils.closeQuietly(op);
        IOUtils.closeQuietly(ips);

        int length = args.length + 2;
        String[] newArgs = new String[length];
        System.arraycopy(args, 0, newArgs, 0, args.length);
        newArgs[length - 2] = TABLE_NAME;
//      newArgs[length - 1] = INPUT_FILE_PATH + INPUT_FILE;
        newArgs[length - 1] = INPUT_FILE;

        Job job = ImportTsv.createSubmittableJob(conf, newArgs);
        job.waitForCompletion(true);
        assertTrue(job.isSuccessful());
    }
项目:HIndex    文件:IndexImportTsv.java   
/**
 * Sets up the actual job.
 * @param conf The current configuration.
 * @param args The command line parameters.
 * @return The newly created job.
 * @throws IOException When setting up the job fails.
 * @throws InterruptedException
 */
public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException,
    ClassNotFoundException {
  HBaseAdmin admin = new IndexAdmin(conf);
  // Support non-XML supported characters
  // by re-encoding the passed separator as a Base64 string.
  String actualSeparator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
  if (actualSeparator != null) {
    conf.set(ImportTsv.SEPARATOR_CONF_KEY, Base64.encodeBytes(actualSeparator.getBytes()));
  }

  // See if a non-default Mapper was set
  String mapperClassName = conf.get(ImportTsv.MAPPER_CONF_KEY);
  Class mapperClass = mapperClassName != null ? Class.forName(mapperClassName) : DEFAULT_MAPPER;

  String tableName = args[0];
  Path inputDir = new Path(args[1]);

  String input = conf.get(IndexUtils.TABLE_INPUT_COLS);
  HTableDescriptor htd = null;
  if (!admin.tableExists(tableName)) {
    htd =
        ImportTsv.prepareHTableDescriptor(tableName, conf.getStrings(ImportTsv.COLUMNS_CONF_KEY));
    if (input != null) {
      htd = IndexUtils.parse(tableName, htd, input, null);
    }
    admin.createTable(htd);
  }

  conf.set(TableInputFormat.INPUT_TABLE, tableName);
  conf.setBoolean(IndexMapReduceUtil.IS_INDEXED_TABLE, input != null);
  Job job = new Job(conf, NAME + "_" + tableName);
  job.setJarByClass(mapperClass);
  FileInputFormat.setInputPaths(job, inputDir);
  job.setInputFormatClass(TextInputFormat.class);
  job.setMapperClass(mapperClass);

  String hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY);
  if (hfileOutPath != null) {
    HTable table = new HTable(conf, tableName);
    job.setReducerClass(PutSortReducer.class);
    Path outputDir = new Path(hfileOutPath);
    FileOutputFormat.setOutputPath(job, outputDir);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(Put.class);
    IndexHFileOutputFormat.configureIncrementalLoad(job, table);
  } else {
    // No reducers. Just write straight to table. Call initTableReducerJob
    // to set up the TableOutputFormat.
    TableMapReduceUtil.initTableReducerJob(tableName, null, job);
    job.setNumReduceTasks(0);
  }

  TableMapReduceUtil.addDependencyJars(job);
  TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
    com.google.common.base.Function.class /* Guava used by TsvParser */);
  return job;
}