public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder getBuilder( WALCellCodec.ByteStringCompressor compressor) throws IOException { org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder builder = org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.newBuilder(); if (compressionContext == null) { builder.setEncodedRegionName(ByteStringer.wrap(this.encodedRegionName)); builder.setTableName(ByteStringer.wrap(this.tablename.getName())); } else { builder.setEncodedRegionName(compressor.compress(this.encodedRegionName, compressionContext.regionDict)); builder.setTableName(compressor.compress(this.tablename.getName(), compressionContext.tableDict)); } builder.setLogSequenceNumber(this.logSeqNum); builder.setWriteTime(writeTime); if (this.origLogSeqNum > 0) { builder.setOrigSequenceNumber(this.origLogSeqNum); } if (this.nonce != HConstants.NO_NONCE) { builder.setNonce(nonce); } if (this.nonceGroup != HConstants.NO_NONCE) { builder.setNonceGroup(nonceGroup); } HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder(); for (UUID clusterId : clusterIds) { uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits()); uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits()); builder.addClusterIds(uuidBuilder.build()); } if (scopes != null) { for (Map.Entry<byte[], Integer> e : scopes.entrySet()) { ByteString family = (compressionContext == null) ? ByteStringer.wrap(e.getKey()) : compressor.compress(e.getKey(), compressionContext.familyDict); builder.addScopes(FamilyScope.newBuilder() .setFamily(family).setScopeType(ScopeType.valueOf(e.getValue()))); } } return builder; }
WALSplitter(final WALFactory factory, Configuration conf, Path rootDir, FileSystem fs, LastSequenceId idChecker, CoordinatedStateManager csm, RecoveryMode mode) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); this.rootDir = rootDir; this.fs = fs; this.sequenceIdChecker = idChecker; this.csm = (BaseCoordinatedStateManager)csm; this.walFactory = factory; this.controller = new PipelineController(); entryBuffers = new EntryBuffers(controller, this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128*1024*1024)); // a larger minBatchSize may slow down recovery because replay writer has to wait for // enough edits before replaying them this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64); this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode); this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); if (csm != null && this.distributedLogReplay) { outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads); } else { if (this.distributedLogReplay) { LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly."); } this.distributedLogReplay = false; outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads); } }
private Path writeWAL(final WALFactory wals, final String tblName) throws IOException { Configuration conf = TEST_UTIL.getConfiguration(); String clsName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, SecureWALCellCodec.class, WALCellCodec.class); try { TableName tableName = TableName.valueOf(tblName); HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor(tableName.getName())); HRegionInfo regioninfo = new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false); final int total = 10; final byte[] row = Bytes.toBytes("row"); final byte[] family = Bytes.toBytes("family"); final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); // Write the WAL WAL wal = wals.getWAL(regioninfo.getEncodedNameAsBytes()); for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value)); wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc), kvs, true); } wal.sync(); final Path walPath = DefaultWALProvider.getCurrentFileName(wal); wal.shutdown(); return walPath; } finally { // restore the cell codec class conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, clsName); } }
public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor compressor) throws IOException { org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder builder = org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.newBuilder(); if (compressionContext == null) { builder.setEncodedRegionName(ByteStringer.wrap(this.encodedRegionName)); builder.setTableName(ByteStringer.wrap(this.tablename.getName())); } else { builder.setEncodedRegionName(compressor.compress(this.encodedRegionName, compressionContext.regionDict)); builder.setTableName(compressor.compress(this.tablename.getName(), compressionContext.tableDict)); } builder.setLogSequenceNumber(this.logSeqNum); builder.setWriteTime(writeTime); if(this.origLogSeqNum > 0) { builder.setOrigSequenceNumber(this.origLogSeqNum); } if (this.nonce != HConstants.NO_NONCE) { builder.setNonce(nonce); } if (this.nonceGroup != HConstants.NO_NONCE) { builder.setNonceGroup(nonceGroup); } HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder(); for (UUID clusterId : clusterIds) { uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits()); uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits()); builder.addClusterIds(uuidBuilder.build()); } if (scopes != null) { for (Map.Entry<byte[], Integer> e : scopes.entrySet()) { ByteString family = (compressionContext == null) ? ByteStringer.wrap(e.getKey()) : compressor.compress(e.getKey(), compressionContext.familyDict); builder.addScopes(FamilyScope.newBuilder() .setFamily(family).setScopeType(ScopeType.valueOf(e.getValue()))); } } return builder; }
WALSplitter(final WALFactory factory, Configuration conf, Path rootDir, FileSystem fs, LastSequenceId idChecker, CoordinatedStateManager csm, RecoveryMode mode) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); this.rootDir = rootDir; this.fs = fs; this.sequenceIdChecker = idChecker; this.csm = (BaseCoordinatedStateManager)csm; this.walFactory = factory; entryBuffers = new EntryBuffers( this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128*1024*1024)); // a larger minBatchSize may slow down recovery because replay writer has to wait for // enough edits before replaying them this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64); this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode); this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); if (csm != null && this.distributedLogReplay) { outputSink = new LogReplayOutputSink(numWriterThreads); } else { if (this.distributedLogReplay) { LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly."); } this.distributedLogReplay = false; outputSink = new LogRecoveredEditsOutputSink(numWriterThreads); } }
private Path writeWAL(final WALFactory wals, final String tblName) throws IOException { Configuration conf = TEST_UTIL.getConfiguration(); String clsName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, SecureWALCellCodec.class, WALCellCodec.class); try { TableName tableName = TableName.valueOf(tblName); HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor(tableName.getName())); HRegionInfo regioninfo = new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false); final int total = 10; final byte[] row = Bytes.toBytes("row"); final byte[] family = Bytes.toBytes("family"); FileSystem fs = TEST_UTIL.getTestFileSystem(); Path logDir = TEST_UTIL.getDataTestDir(tblName); final AtomicLong sequenceId = new AtomicLong(1); // Write the WAL WAL wal = wals.getWAL(regioninfo.getEncodedNameAsBytes()); for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value)); wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis()), kvs, sequenceId, true, null); } wal.sync(); final Path walPath = DefaultWALProvider.getCurrentFileName(wal); wal.shutdown(); return walPath; } finally { // restore the cell codec class conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, clsName); } }
public WALProtos.WALKey.Builder getBuilder( WALCellCodec.ByteStringCompressor compressor) throws IOException { WALProtos.WALKey.Builder builder = WALProtos.WALKey.newBuilder(); if (compressionContext == null) { builder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(this.encodedRegionName)); builder.setTableName(UnsafeByteOperations.unsafeWrap(this.tablename.getName())); } else { builder.setEncodedRegionName(compressor.compress(this.encodedRegionName, compressionContext.regionDict)); builder.setTableName(compressor.compress(this.tablename.getName(), compressionContext.tableDict)); } builder.setLogSequenceNumber(getSequenceId()); builder.setWriteTime(writeTime); if (this.origLogSeqNum > 0) { builder.setOrigSequenceNumber(this.origLogSeqNum); } if (this.nonce != HConstants.NO_NONCE) { builder.setNonce(nonce); } if (this.nonceGroup != HConstants.NO_NONCE) { builder.setNonceGroup(nonceGroup); } HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder(); for (UUID clusterId : clusterIds) { uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits()); uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits()); builder.addClusterIds(uuidBuilder.build()); } if (replicationScope != null) { for (Map.Entry<byte[], Integer> e : replicationScope.entrySet()) { ByteString family = (compressionContext == null) ? UnsafeByteOperations.unsafeWrap(e.getKey()) : compressor.compress(e.getKey(), compressionContext.familyDict); builder.addScopes(FamilyScope.newBuilder() .setFamily(family).setScopeType(ScopeType.forNumber(e.getValue()))); } } return builder; }
public void readFieldsFromPb(WALProtos.WALKey walKey, WALCellCodec.ByteStringUncompressor uncompressor) throws IOException { if (this.compressionContext != null) { this.encodedRegionName = uncompressor.uncompress( walKey.getEncodedRegionName(), compressionContext.regionDict); byte[] tablenameBytes = uncompressor.uncompress( walKey.getTableName(), compressionContext.tableDict); this.tablename = TableName.valueOf(tablenameBytes); } else { this.encodedRegionName = walKey.getEncodedRegionName().toByteArray(); this.tablename = TableName.valueOf(walKey.getTableName().toByteArray()); } clusterIds.clear(); for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) { clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits())); } if (walKey.hasNonceGroup()) { this.nonceGroup = walKey.getNonceGroup(); } if (walKey.hasNonce()) { this.nonce = walKey.getNonce(); } this.replicationScope = null; if (walKey.getScopesCount() > 0) { this.replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR); for (FamilyScope scope : walKey.getScopesList()) { byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() : uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict); this.replicationScope.put(family, scope.getScopeType().getNumber()); } } setSequenceId(walKey.getLogSequenceNumber()); this.writeTime = walKey.getWriteTime(); if(walKey.hasOrigSequenceNumber()) { this.origLogSeqNum = walKey.getOrigSequenceNumber(); } }
@VisibleForTesting WALSplitter(final WALFactory factory, Configuration conf, Path rootDir, FileSystem fs, LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); this.rootDir = rootDir; this.fs = fs; this.sequenceIdChecker = idChecker; this.splitLogWorkerCoordination = splitLogWorkerCoordination; this.walFactory = factory; PipelineController controller = new PipelineController(); this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false); entryBuffers = new EntryBuffers(controller, this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024), splitWriterCreationBounded); int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); if(splitWriterCreationBounded){ outputSink = new BoundedLogWriterCreationOutputSink( controller, entryBuffers, numWriterThreads); }else { outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads); } }
public void readFieldsFromPb(org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKey, WALCellCodec.ByteStringUncompressor uncompressor) throws IOException { if (this.compressionContext != null) { this.encodedRegionName = uncompressor.uncompress( walKey.getEncodedRegionName(), compressionContext.regionDict); byte[] tablenameBytes = uncompressor.uncompress( walKey.getTableName(), compressionContext.tableDict); this.tablename = TableName.valueOf(tablenameBytes); } else { this.encodedRegionName = walKey.getEncodedRegionName().toByteArray(); this.tablename = TableName.valueOf(walKey.getTableName().toByteArray()); } clusterIds.clear(); if (walKey.hasClusterId()) { //When we are reading the older log (0.95.1 release) //This is definitely the originating cluster clusterIds.add(new UUID(walKey.getClusterId().getMostSigBits(), walKey.getClusterId() .getLeastSigBits())); } for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) { clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits())); } if (walKey.hasNonceGroup()) { this.nonceGroup = walKey.getNonceGroup(); } if (walKey.hasNonce()) { this.nonce = walKey.getNonce(); } this.scopes = null; if (walKey.getScopesCount() > 0) { this.scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); for (FamilyScope scope : walKey.getScopesList()) { byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() : uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict); this.scopes.put(family, scope.getScopeType().getNumber()); } } this.logSeqNum = walKey.getLogSequenceNumber(); this.writeTime = walKey.getWriteTime(); if(walKey.hasOrigSequenceNumber()) { this.origLogSeqNum = walKey.getOrigSequenceNumber(); } }
public void readFieldsFromPb( org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKey, WALCellCodec.ByteStringUncompressor uncompressor) throws IOException { if (this.compressionContext != null) { this.encodedRegionName = uncompressor.uncompress( walKey.getEncodedRegionName(), compressionContext.regionDict); byte[] tablenameBytes = uncompressor.uncompress( walKey.getTableName(), compressionContext.tableDict); this.tablename = TableName.valueOf(tablenameBytes); } else { this.encodedRegionName = walKey.getEncodedRegionName().toByteArray(); this.tablename = TableName.valueOf(walKey.getTableName().toByteArray()); } clusterIds.clear(); if (walKey.hasClusterId()) { //When we are reading the older log (0.95.1 release) //This is definitely the originating cluster clusterIds.add(new UUID(walKey.getClusterId().getMostSigBits(), walKey.getClusterId() .getLeastSigBits())); } for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) { clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits())); } if (walKey.hasNonceGroup()) { this.nonceGroup = walKey.getNonceGroup(); } if (walKey.hasNonce()) { this.nonce = walKey.getNonce(); } this.scopes = null; if (walKey.getScopesCount() > 0) { this.scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); for (FamilyScope scope : walKey.getScopesList()) { byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() : uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict); this.scopes.put(family, scope.getScopeType().getNumber()); } } this.logSeqNum = walKey.getLogSequenceNumber(); this.writeTime = walKey.getWriteTime(); if(walKey.hasOrigSequenceNumber()) { this.origLogSeqNum = walKey.getOrigSequenceNumber(); } }
private Path writeWAL(final WALFactory wals, final String tblName, boolean offheap) throws IOException { Configuration conf = TEST_UTIL.getConfiguration(); String clsName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, SecureWALCellCodec.class, WALCellCodec.class); try { TableName tableName = TableName.valueOf(tblName); NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); scopes.put(tableName.getName(), 0); RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build(); final int total = 10; final byte[] row = Bytes.toBytes("row"); final byte[] family = Bytes.toBytes("family"); final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); // Write the WAL WAL wal = wals.getWAL(regionInfo); for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); KeyValue kv = new KeyValue(row, family, Bytes.toBytes(i), value); if (offheap) { ByteBuffer bb = ByteBuffer.allocateDirect(kv.getBuffer().length); bb.put(kv.getBuffer()); ByteBufferKeyValue offheapKV = new ByteBufferKeyValue(bb, 0, kv.getLength()); kvs.add(offheapKV); } else { kvs.add(kv); } wal.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes), kvs, true); } wal.sync(); final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); wal.shutdown(); return walPath; } finally { // restore the cell codec class conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, clsName); } }
/** * Sets up the actual job. * * @param args The command line parameters. * @return The newly created job. * @throws IOException When setting up the job fails. */ public Job createSubmittableJob(String[] args) throws IOException { Configuration conf = getConf(); setupTime(conf, WALInputFormat.START_TIME_KEY); setupTime(conf, WALInputFormat.END_TIME_KEY); String inputDirs = args[0]; String[] tables = args[1].split(","); String[] tableMap; if (args.length > 2) { tableMap = args[2].split(","); if (tableMap.length != tables.length) { throw new IOException("The same number of tables and mapping must be provided."); } } else { // if not mapping is specified map each table to itself tableMap = tables; } conf.setStrings(TABLES_KEY, tables); conf.setStrings(TABLE_MAP_KEY, tableMap); conf.set(FileInputFormat.INPUT_DIR, inputDirs); Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis())); job.setJarByClass(WALPlayer.class); job.setInputFormatClass(WALInputFormat.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); if (hfileOutPath != null) { LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); // the bulk HFile case if (tables.length != 1) { throw new IOException("Exactly one table must be specified for the bulk export option"); } TableName tableName = TableName.valueOf(tables[0]); job.setMapperClass(WALKeyValueMapper.class); job.setReducerClass(CellSortReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputValueClass(MapReduceExtendedCell.class); try (Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(tableName); RegionLocator regionLocator = conn.getRegionLocator(tableName)) { HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); } TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class); } else { // output to live cluster job.setMapperClass(WALMapper.class); job.setOutputFormatClass(MultiTableOutputFormat.class); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); // No reducers. job.setNumReduceTasks(0); } String codecCls = WALCellCodec.getWALCellCodecClass(conf); try { TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Class.forName(codecCls)); } catch (Exception e) { throw new IOException("Cannot determine wal codec class " + codecCls, e); } return job; }