Java 类org.apache.hadoop.hbase.regionserver.wal.WALCellCodec 实例源码

项目:ditb    文件:WALKey.java   
public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder getBuilder(
    WALCellCodec.ByteStringCompressor compressor) throws IOException {
  org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder builder =
      org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.newBuilder();
  if (compressionContext == null) {
    builder.setEncodedRegionName(ByteStringer.wrap(this.encodedRegionName));
    builder.setTableName(ByteStringer.wrap(this.tablename.getName()));
  } else {
    builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
        compressionContext.regionDict));
    builder.setTableName(compressor.compress(this.tablename.getName(),
        compressionContext.tableDict));
  }
  builder.setLogSequenceNumber(this.logSeqNum);
  builder.setWriteTime(writeTime);
  if (this.origLogSeqNum > 0) {
    builder.setOrigSequenceNumber(this.origLogSeqNum);
  }
  if (this.nonce != HConstants.NO_NONCE) {
    builder.setNonce(nonce);
  }
  if (this.nonceGroup != HConstants.NO_NONCE) {
    builder.setNonceGroup(nonceGroup);
  }
  HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
  for (UUID clusterId : clusterIds) {
    uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
    uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
    builder.addClusterIds(uuidBuilder.build());
  }
  if (scopes != null) {
    for (Map.Entry<byte[], Integer> e : scopes.entrySet()) {
      ByteString family = (compressionContext == null) ? ByteStringer.wrap(e.getKey())
          : compressor.compress(e.getKey(), compressionContext.familyDict);
      builder.addScopes(FamilyScope.newBuilder()
          .setFamily(family).setScopeType(ScopeType.valueOf(e.getValue())));
    }
  }
  return builder;
}
项目:ditb    文件:WALSplitter.java   
WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
    FileSystem fs, LastSequenceId idChecker,
    CoordinatedStateManager csm, RecoveryMode mode) {
  this.conf = HBaseConfiguration.create(conf);
  String codecClassName = conf
      .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
  this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
  this.rootDir = rootDir;
  this.fs = fs;
  this.sequenceIdChecker = idChecker;
  this.csm = (BaseCoordinatedStateManager)csm;
  this.walFactory = factory;
  this.controller = new PipelineController();

  entryBuffers = new EntryBuffers(controller,
      this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
          128*1024*1024));

  // a larger minBatchSize may slow down recovery because replay writer has to wait for
  // enough edits before replaying them
  this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
  this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);

  this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
  if (csm != null && this.distributedLogReplay) {
    outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads);
  } else {
    if (this.distributedLogReplay) {
      LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
    }
    this.distributedLogReplay = false;
    outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
  }

}
项目:ditb    文件:TestWALReaderOnSecureWAL.java   
private Path writeWAL(final WALFactory wals, final String tblName) throws IOException {
  Configuration conf = TEST_UTIL.getConfiguration();
  String clsName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
  conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, SecureWALCellCodec.class,
    WALCellCodec.class);
  try {
    TableName tableName = TableName.valueOf(tblName);
    HTableDescriptor htd = new HTableDescriptor(tableName);
    htd.addFamily(new HColumnDescriptor(tableName.getName()));
    HRegionInfo regioninfo = new HRegionInfo(tableName,
      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
    final int total = 10;
    final byte[] row = Bytes.toBytes("row");
    final byte[] family = Bytes.toBytes("family");
    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);

    // Write the WAL
    WAL wal = wals.getWAL(regioninfo.getEncodedNameAsBytes());
    for (int i = 0; i < total; i++) {
      WALEdit kvs = new WALEdit();
      kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
      wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
          System.currentTimeMillis(), mvcc), kvs, true);
    }
    wal.sync();
    final Path walPath = DefaultWALProvider.getCurrentFileName(wal);
    wal.shutdown();

    return walPath;
  } finally {
    // restore the cell codec class
    conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, clsName);
  }
}
项目:pbase    文件:WALKey.java   
public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor compressor)
throws IOException {
  org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder builder = org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.newBuilder();
  if (compressionContext == null) {
    builder.setEncodedRegionName(ByteStringer.wrap(this.encodedRegionName));
    builder.setTableName(ByteStringer.wrap(this.tablename.getName()));
  } else {
    builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
      compressionContext.regionDict));
    builder.setTableName(compressor.compress(this.tablename.getName(),
      compressionContext.tableDict));
  }
  builder.setLogSequenceNumber(this.logSeqNum);
  builder.setWriteTime(writeTime);
  if(this.origLogSeqNum > 0) {
    builder.setOrigSequenceNumber(this.origLogSeqNum);
  }
  if (this.nonce != HConstants.NO_NONCE) {
    builder.setNonce(nonce);
  }
  if (this.nonceGroup != HConstants.NO_NONCE) {
    builder.setNonceGroup(nonceGroup);
  }
  HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
  for (UUID clusterId : clusterIds) {
    uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
    uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
    builder.addClusterIds(uuidBuilder.build());
  }
  if (scopes != null) {
    for (Map.Entry<byte[], Integer> e : scopes.entrySet()) {
      ByteString family = (compressionContext == null) ? ByteStringer.wrap(e.getKey())
          : compressor.compress(e.getKey(), compressionContext.familyDict);
      builder.addScopes(FamilyScope.newBuilder()
          .setFamily(family).setScopeType(ScopeType.valueOf(e.getValue())));
    }
  }
  return builder;
}
项目:pbase    文件:WALSplitter.java   
WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
    FileSystem fs, LastSequenceId idChecker,
    CoordinatedStateManager csm, RecoveryMode mode) {
  this.conf = HBaseConfiguration.create(conf);
  String codecClassName = conf
      .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
  this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
  this.rootDir = rootDir;
  this.fs = fs;
  this.sequenceIdChecker = idChecker;
  this.csm = (BaseCoordinatedStateManager)csm;
  this.walFactory = factory;

  entryBuffers = new EntryBuffers(
      this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
          128*1024*1024));

  // a larger minBatchSize may slow down recovery because replay writer has to wait for
  // enough edits before replaying them
  this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
  this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);

  this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
  if (csm != null && this.distributedLogReplay) {
    outputSink = new LogReplayOutputSink(numWriterThreads);
  } else {
    if (this.distributedLogReplay) {
      LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
    }
    this.distributedLogReplay = false;
    outputSink = new LogRecoveredEditsOutputSink(numWriterThreads);
  }

}
项目:pbase    文件:TestWALReaderOnSecureWAL.java   
private Path writeWAL(final WALFactory wals, final String tblName) throws IOException {
  Configuration conf = TEST_UTIL.getConfiguration();
  String clsName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
  conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, SecureWALCellCodec.class,
    WALCellCodec.class);
  try {
    TableName tableName = TableName.valueOf(tblName);
    HTableDescriptor htd = new HTableDescriptor(tableName);
    htd.addFamily(new HColumnDescriptor(tableName.getName()));
    HRegionInfo regioninfo = new HRegionInfo(tableName,
      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
    final int total = 10;
    final byte[] row = Bytes.toBytes("row");
    final byte[] family = Bytes.toBytes("family");
    FileSystem fs = TEST_UTIL.getTestFileSystem();
    Path logDir = TEST_UTIL.getDataTestDir(tblName);
    final AtomicLong sequenceId = new AtomicLong(1);

    // Write the WAL
    WAL wal = wals.getWAL(regioninfo.getEncodedNameAsBytes());
    for (int i = 0; i < total; i++) {
      WALEdit kvs = new WALEdit();
      kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
      wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
          System.currentTimeMillis()), kvs, sequenceId, true, null);
    }
    wal.sync();
    final Path walPath = DefaultWALProvider.getCurrentFileName(wal);
    wal.shutdown();

    return walPath;
  } finally {
    // restore the cell codec class
    conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, clsName);
  }
}
项目:hbase    文件:WALKeyImpl.java   
public WALProtos.WALKey.Builder getBuilder(
    WALCellCodec.ByteStringCompressor compressor) throws IOException {
  WALProtos.WALKey.Builder builder = WALProtos.WALKey.newBuilder();
  if (compressionContext == null) {
    builder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(this.encodedRegionName));
    builder.setTableName(UnsafeByteOperations.unsafeWrap(this.tablename.getName()));
  } else {
    builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
        compressionContext.regionDict));
    builder.setTableName(compressor.compress(this.tablename.getName(),
        compressionContext.tableDict));
  }
  builder.setLogSequenceNumber(getSequenceId());
  builder.setWriteTime(writeTime);
  if (this.origLogSeqNum > 0) {
    builder.setOrigSequenceNumber(this.origLogSeqNum);
  }
  if (this.nonce != HConstants.NO_NONCE) {
    builder.setNonce(nonce);
  }
  if (this.nonceGroup != HConstants.NO_NONCE) {
    builder.setNonceGroup(nonceGroup);
  }
  HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
  for (UUID clusterId : clusterIds) {
    uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
    uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
    builder.addClusterIds(uuidBuilder.build());
  }
  if (replicationScope != null) {
    for (Map.Entry<byte[], Integer> e : replicationScope.entrySet()) {
      ByteString family = (compressionContext == null)
          ? UnsafeByteOperations.unsafeWrap(e.getKey())
          : compressor.compress(e.getKey(), compressionContext.familyDict);
      builder.addScopes(FamilyScope.newBuilder()
          .setFamily(family).setScopeType(ScopeType.forNumber(e.getValue())));
    }
  }
  return builder;
}
项目:hbase    文件:WALKeyImpl.java   
public void readFieldsFromPb(WALProtos.WALKey walKey,
                             WALCellCodec.ByteStringUncompressor uncompressor)
    throws IOException {
  if (this.compressionContext != null) {
    this.encodedRegionName = uncompressor.uncompress(
        walKey.getEncodedRegionName(), compressionContext.regionDict);
    byte[] tablenameBytes = uncompressor.uncompress(
        walKey.getTableName(), compressionContext.tableDict);
    this.tablename = TableName.valueOf(tablenameBytes);
  } else {
    this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
    this.tablename = TableName.valueOf(walKey.getTableName().toByteArray());
  }
  clusterIds.clear();
  for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
    clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits()));
  }
  if (walKey.hasNonceGroup()) {
    this.nonceGroup = walKey.getNonceGroup();
  }
  if (walKey.hasNonce()) {
    this.nonce = walKey.getNonce();
  }
  this.replicationScope = null;
  if (walKey.getScopesCount() > 0) {
    this.replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
    for (FamilyScope scope : walKey.getScopesList()) {
      byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
        uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
      this.replicationScope.put(family, scope.getScopeType().getNumber());
    }
  }
  setSequenceId(walKey.getLogSequenceNumber());
  this.writeTime = walKey.getWriteTime();
  if(walKey.hasOrigSequenceNumber()) {
    this.origLogSeqNum = walKey.getOrigSequenceNumber();
  }
}
项目:hbase    文件:WALSplitter.java   
@VisibleForTesting
WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
    FileSystem fs, LastSequenceId idChecker,
    SplitLogWorkerCoordination splitLogWorkerCoordination) {
  this.conf = HBaseConfiguration.create(conf);
  String codecClassName = conf
      .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
  this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
  this.rootDir = rootDir;
  this.fs = fs;
  this.sequenceIdChecker = idChecker;
  this.splitLogWorkerCoordination = splitLogWorkerCoordination;

  this.walFactory = factory;
  PipelineController controller = new PipelineController();

  this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);

  entryBuffers = new EntryBuffers(controller,
      this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024),
      splitWriterCreationBounded);

  int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
  if(splitWriterCreationBounded){
    outputSink = new BoundedLogWriterCreationOutputSink(
        controller, entryBuffers, numWriterThreads);
  }else {
    outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
  }
}
项目:ditb    文件:WALKey.java   
public void readFieldsFromPb(org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKey,
                             WALCellCodec.ByteStringUncompressor uncompressor)
    throws IOException {
  if (this.compressionContext != null) {
    this.encodedRegionName = uncompressor.uncompress(
        walKey.getEncodedRegionName(), compressionContext.regionDict);
    byte[] tablenameBytes = uncompressor.uncompress(
        walKey.getTableName(), compressionContext.tableDict);
    this.tablename = TableName.valueOf(tablenameBytes);
  } else {
    this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
    this.tablename = TableName.valueOf(walKey.getTableName().toByteArray());
  }
  clusterIds.clear();
  if (walKey.hasClusterId()) {
    //When we are reading the older log (0.95.1 release)
    //This is definitely the originating cluster
    clusterIds.add(new UUID(walKey.getClusterId().getMostSigBits(), walKey.getClusterId()
        .getLeastSigBits()));
  }
  for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
    clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits()));
  }
  if (walKey.hasNonceGroup()) {
    this.nonceGroup = walKey.getNonceGroup();
  }
  if (walKey.hasNonce()) {
    this.nonce = walKey.getNonce();
  }
  this.scopes = null;
  if (walKey.getScopesCount() > 0) {
    this.scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
    for (FamilyScope scope : walKey.getScopesList()) {
      byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
        uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
      this.scopes.put(family, scope.getScopeType().getNumber());
    }
  }
  this.logSeqNum = walKey.getLogSequenceNumber();
  this.writeTime = walKey.getWriteTime();
  if(walKey.hasOrigSequenceNumber()) {
    this.origLogSeqNum = walKey.getOrigSequenceNumber();
  }
}
项目:pbase    文件:WALKey.java   
public void readFieldsFromPb(
    org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKey, WALCellCodec.ByteStringUncompressor uncompressor) throws IOException {
  if (this.compressionContext != null) {
    this.encodedRegionName = uncompressor.uncompress(
        walKey.getEncodedRegionName(), compressionContext.regionDict);
    byte[] tablenameBytes = uncompressor.uncompress(
        walKey.getTableName(), compressionContext.tableDict);
    this.tablename = TableName.valueOf(tablenameBytes);
  } else {
    this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
    this.tablename = TableName.valueOf(walKey.getTableName().toByteArray());
  }
  clusterIds.clear();
  if (walKey.hasClusterId()) {
    //When we are reading the older log (0.95.1 release)
    //This is definitely the originating cluster
    clusterIds.add(new UUID(walKey.getClusterId().getMostSigBits(), walKey.getClusterId()
        .getLeastSigBits()));
  }
  for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
    clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits()));
  }
  if (walKey.hasNonceGroup()) {
    this.nonceGroup = walKey.getNonceGroup();
  }
  if (walKey.hasNonce()) {
    this.nonce = walKey.getNonce();
  }
  this.scopes = null;
  if (walKey.getScopesCount() > 0) {
    this.scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
    for (FamilyScope scope : walKey.getScopesList()) {
      byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
        uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
      this.scopes.put(family, scope.getScopeType().getNumber());
    }
  }
  this.logSeqNum = walKey.getLogSequenceNumber();
  this.writeTime = walKey.getWriteTime();
  if(walKey.hasOrigSequenceNumber()) {
    this.origLogSeqNum = walKey.getOrigSequenceNumber();
  }
}
项目:hbase    文件:TestWALReaderOnSecureWAL.java   
private Path writeWAL(final WALFactory wals, final String tblName, boolean offheap) throws IOException {
  Configuration conf = TEST_UTIL.getConfiguration();
  String clsName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
  conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, SecureWALCellCodec.class,
    WALCellCodec.class);
  try {
    TableName tableName = TableName.valueOf(tblName);
    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
    scopes.put(tableName.getName(), 0);
    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
    final int total = 10;
    final byte[] row = Bytes.toBytes("row");
    final byte[] family = Bytes.toBytes("family");
    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);

    // Write the WAL
    WAL wal = wals.getWAL(regionInfo);
    for (int i = 0; i < total; i++) {
      WALEdit kvs = new WALEdit();
      KeyValue kv = new KeyValue(row, family, Bytes.toBytes(i), value);
      if (offheap) {
        ByteBuffer bb = ByteBuffer.allocateDirect(kv.getBuffer().length);
        bb.put(kv.getBuffer());
        ByteBufferKeyValue offheapKV = new ByteBufferKeyValue(bb, 0, kv.getLength());
        kvs.add(offheapKV);
      } else {
        kvs.add(kv);
      }
      wal.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
          System.currentTimeMillis(), mvcc, scopes), kvs, true);
    }
    wal.sync();
    final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
    wal.shutdown();

    return walPath;
  } finally {
    // restore the cell codec class
    conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, clsName);
  }
}
项目:hbase    文件:WALPlayer.java   
/**
 * Sets up the actual job.
 *
 * @param args  The command line parameters.
 * @return The newly created job.
 * @throws IOException When setting up the job fails.
 */
public Job createSubmittableJob(String[] args) throws IOException {
  Configuration conf = getConf();
  setupTime(conf, WALInputFormat.START_TIME_KEY);
  setupTime(conf, WALInputFormat.END_TIME_KEY);
  String inputDirs = args[0];
  String[] tables = args[1].split(",");
  String[] tableMap;
  if (args.length > 2) {
    tableMap = args[2].split(",");
    if (tableMap.length != tables.length) {
      throw new IOException("The same number of tables and mapping must be provided.");
    }
  } else {
    // if not mapping is specified map each table to itself
    tableMap = tables;
  }
  conf.setStrings(TABLES_KEY, tables);
  conf.setStrings(TABLE_MAP_KEY, tableMap);
  conf.set(FileInputFormat.INPUT_DIR, inputDirs);
  Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis()));
  job.setJarByClass(WALPlayer.class);

  job.setInputFormatClass(WALInputFormat.class);
  job.setMapOutputKeyClass(ImmutableBytesWritable.class);

  String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
  if (hfileOutPath != null) {
    LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);

    // the bulk HFile case
    if (tables.length != 1) {
      throw new IOException("Exactly one table must be specified for the bulk export option");
    }
    TableName tableName = TableName.valueOf(tables[0]);
    job.setMapperClass(WALKeyValueMapper.class);
    job.setReducerClass(CellSortReducer.class);
    Path outputDir = new Path(hfileOutPath);
    FileOutputFormat.setOutputPath(job, outputDir);
    job.setMapOutputValueClass(MapReduceExtendedCell.class);
    try (Connection conn = ConnectionFactory.createConnection(conf);
        Table table = conn.getTable(tableName);
        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
      HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
    }
    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
        org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
  } else {
    // output to live cluster
    job.setMapperClass(WALMapper.class);
    job.setOutputFormatClass(MultiTableOutputFormat.class);
    TableMapReduceUtil.addDependencyJars(job);
    TableMapReduceUtil.initCredentials(job);
    // No reducers.
    job.setNumReduceTasks(0);
  }
  String codecCls = WALCellCodec.getWALCellCodecClass(conf);
  try {
    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Class.forName(codecCls));
  } catch (Exception e) {
    throw new IOException("Cannot determine wal codec class " + codecCls, e);
  }
  return job;
}