@Before public void setUp() throws Exception { UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, walProvider, WALProvider.class); UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, 2000); UTIL.getConfiguration().setClass(HConstants.REGION_SERVER_IMPL, IgnoreYouAreDeadRS.class, HRegionServer.class); UTIL.startMiniCluster(2); Table table = UTIL.createTable(TABLE_NAME, CF); for (int i = 0; i < 10; i++) { table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); } UTIL.getAdmin().flush(TABLE_NAME); for (int i = 10; i < 20; i++) { table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); } UTIL.getAdmin().flush(TABLE_NAME); }
private static void transformFile(Path input, Path output) throws IOException { Configuration conf = HBaseConfiguration.create(); FileSystem inFS = input.getFileSystem(conf); FileSystem outFS = output.getFileSystem(conf); WAL.Reader in = WALFactory.createReaderIgnoreCustomClass(inFS, input, conf); WALProvider.Writer out = null; try { if (!(in instanceof ReaderBase)) { System.err.println("Cannot proceed, invalid reader type: " + in.getClass().getName()); return; } boolean compress = ((ReaderBase)in).hasCompression(); conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress); out = WALFactory.createWALWriter(outFS, output, conf); WAL.Entry e = null; while ((e = in.next()) != null) out.append(e); } finally { in.close(); if (out != null) { out.close(); out = null; } } }
/** * Sanity check that we can move logs around while we are reading * from them. Should this test fail, ReplicationSource would have a hard * time reading logs that are being archived. * @throws Exception */ @Test public void testLogMoving() throws Exception{ Path logPath = new Path(logDir, "log"); if (!FS.exists(logDir)) FS.mkdirs(logDir); if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir); WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath, TEST_UTIL.getConfiguration()); for(int i = 0; i < 3; i++) { byte[] b = Bytes.toBytes(Integer.toString(i)); KeyValue kv = new KeyValue(b,b,b); WALEdit edit = new WALEdit(); edit.add(kv); WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID); writer.append(new WAL.Entry(key, edit)); writer.sync(); } writer.close(); WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration()); WAL.Entry entry = reader.next(); assertNotNull(entry); Path oldLogPath = new Path(oldLogDir, "log"); FS.rename(logPath, oldLogPath); entry = reader.next(); assertNotNull(entry); entry = reader.next(); entry = reader.next(); assertNull(entry); reader.close(); }
@BeforeClass public static void setupBeforeClass() throws Exception { conf = TEST_UTIL.getConfiguration(); conf.setClass("hbase.regionserver.hlog.writer.impl", SequenceFileLogWriter.class, WALProvider.Writer.class); fs = TEST_UTIL.getTestFileSystem(); dir = new Path(TEST_UTIL.createRootDir(), "testReadOldRootAndMetaEdits"); fs.mkdirs(dir); }
/** * Load the replication executorService objects, if any */ private static void createNewReplicationInstance(Configuration conf, HRegionServer server, FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException { if ((server instanceof HMaster) && (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) { return; } // read in the name of the source replication class from the config file. String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); // read in the name of the sink replication class from the config file. String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); // If both the sink and the source class names are the same, then instantiate // only one object. if (sourceClassname.equals(sinkClassname)) { server.replicationSourceHandler = newReplicationInstance(sourceClassname, ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; } else { server.replicationSourceHandler = newReplicationInstance(sourceClassname, ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); server.replicationSinkHandler = newReplicationInstance(sinkClassname, ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider); } }
private static <T extends ReplicationService> T newReplicationInstance(String classname, Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, Path oldLogDir, WALProvider walProvider) throws IOException { Class<? extends T> clazz = null; try { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); clazz = Class.forName(classname, true, classLoader).asSubclass(xface); } catch (java.lang.ClassNotFoundException nfe) { throw new IOException("Could not find class for " + classname); } T service = ReflectionUtils.newInstance(clazz, conf); service.initialize(server, walFs, logDir, oldLogDir, walProvider); return service; }
/** * Sanity check that we can move logs around while we are reading * from them. Should this test fail, ReplicationSource would have a hard * time reading logs that are being archived. * @throws Exception */ @Test public void testLogMoving() throws Exception{ Path logPath = new Path(logDir, "log"); if (!FS.exists(logDir)) FS.mkdirs(logDir); if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir); WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath, TEST_UTIL.getConfiguration()); for(int i = 0; i < 3; i++) { byte[] b = Bytes.toBytes(Integer.toString(i)); KeyValue kv = new KeyValue(b,b,b); WALEdit edit = new WALEdit(); edit.add(kv); WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID); writer.append(new WAL.Entry(key, edit)); writer.sync(); } writer.close(); WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration()); WAL.Entry entry = reader.next(); assertNotNull(entry); Path oldLogPath = new Path(oldLogDir, "log"); FS.rename(logPath, oldLogPath); entry = reader.next(); assertNotNull(entry); entry = reader.next(); entry = reader.next(); assertNull(entry); reader.close(); }
@Test public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception { String method = "testSkipRecoveredEditsReplayTheLastFileIgnored"; TableName tableName = TableName.valueOf(method); byte[] family = Bytes.toBytes("family"); this.region = initHRegion(tableName, method, CONF, family); final WALFactory wals = new WALFactory(CONF, null, method); try { Path regiondir = region.getRegionFileSystem().getRegionDir(); FileSystem fs = region.getRegionFileSystem().getFileSystem(); byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); assertEquals(0, region.getStoreFileList( region.getStores().keySet().toArray(new byte[0][])).size()); Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); long maxSeqId = 1050; long minSeqId = 1000; for (long i = minSeqId; i <= maxSeqId; i += 10) { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); fs.create(recoveredEdits); WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); long time = System.nanoTime(); WALEdit edit = null; if (i == maxSeqId) { edit = WALEdit.createCompaction(region.getRegionInfo(), CompactionDescriptor.newBuilder() .setTableName(ByteString.copyFrom(tableName.getName())) .setFamilyName(ByteString.copyFrom(regionName)) .setEncodedRegionName(ByteString.copyFrom(regionName)) .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString()))) .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName())) .build()); } else { edit = new WALEdit(); edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes .toBytes(i))); } writer.append(new WAL.Entry(new HLogKey(regionName, tableName, i, time, HConstants.DEFAULT_CLUSTER_ID), edit)); writer.close(); } long recoverSeqId = 1030; Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); MonitoredTask status = TaskMonitor.get().createStatus(method); for (Store store : region.getStores().values()) { maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), recoverSeqId - 1); } long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); assertEquals(maxSeqId, seqId); // assert that the files are flushed assertEquals(1, region.getStoreFileList( region.getStores().keySet().toArray(new byte[0][])).size()); } finally { HRegion.closeHRegion(this.region); this.region = null; wals.close(); } }
/** * Inserts three waledits in the wal file, and reads them back. The first edit is of a regular * table, second waledit is for the ROOT table (it will be ignored while reading), * and last waledit is for the hbase:meta table, which will be linked to the new system:meta table. * @throws IOException */ @Test public void testReadOldRootAndMetaEdits() throws IOException { LOG.debug("testReadOldRootAndMetaEdits"); // kv list to be used for all WALEdits. byte[] row = Bytes.toBytes("row"); KeyValue kv = new KeyValue(row, row, row, row); List<KeyValue> kvs = new ArrayList<KeyValue>(); kvs.add(kv); WALProvider.Writer writer = null; WAL.Reader reader = null; // a regular table TableName t = TableName.valueOf("t"); HRegionInfo tRegionInfo = null; int logCount = 0; long timestamp = System.currentTimeMillis(); Path path = new Path(dir, "t"); try { tRegionInfo = new HRegionInfo(t, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); WAL.Entry tEntry = createAEntry(new HLogKey(tRegionInfo.getEncodedNameAsBytes(), t, ++logCount, timestamp, HConstants.DEFAULT_CLUSTER_ID), kvs); // create a old root edit (-ROOT-). WAL.Entry rootEntry = createAEntry(new HLogKey(Bytes.toBytes(TableName.OLD_ROOT_STR), TableName.OLD_ROOT_TABLE_NAME, ++logCount, timestamp, HConstants.DEFAULT_CLUSTER_ID), kvs); // create a old meta edit (hbase:meta). WAL.Entry oldMetaEntry = createAEntry(new HLogKey(Bytes.toBytes(TableName.OLD_META_STR), TableName.OLD_META_TABLE_NAME, ++logCount, timestamp, HConstants.DEFAULT_CLUSTER_ID), kvs); // write above entries writer = WALFactory.createWALWriter(fs, path, conf); writer.append(tEntry); writer.append(rootEntry); writer.append(oldMetaEntry); // sync/close the writer writer.sync(); writer.close(); // read the log and see things are okay. reader = WALFactory.createReader(fs, path, conf); WAL.Entry entry = reader.next(); assertNotNull(entry); assertTrue(entry.getKey().getTablename().equals(t)); assertEquals(Bytes.toString(entry.getKey().getEncodedRegionName()), Bytes.toString(tRegionInfo.getEncodedNameAsBytes())); // read the ROOT waledit, but that will be ignored, and hbase:meta waledit will be read instead. entry = reader.next(); assertEquals(entry.getKey().getTablename(), TableName.META_TABLE_NAME); // should reach end of log assertNull(reader.next()); } finally { if (writer != null) { writer.close(); } if (reader != null) { reader.close(); } } }
@Override public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir, WALProvider walProvider) throws IOException { this.server = server; this.conf = this.server.getConfiguration(); this.isReplicationForBulkLoadDataEnabled = ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf); this.scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d") .setDaemon(true) .build()); if (this.isReplicationForBulkLoadDataEnabled) { if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) { throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID + " cannot be null/empty when " + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY + " is set to true."); } } try { this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); this.replicationPeers = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf); this.replicationPeers.init(); this.replicationTracker = ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.server, this.server); } catch (Exception e) { throw new IOException("Failed replication handler create", e); } UUID clusterId = null; try { clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper()); } catch (KeeperException ke) { throw new IOException("Could not read cluster id", ke); } this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty()); if (walProvider != null) { walProvider .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); } this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); this.replicationLoad = new ReplicationLoad(); this.peerProcedureHandler = new PeerProcedureHandlerImpl(replicationManager); }
@Test public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception { byte[] family = Bytes.toBytes("family"); this.region = initHRegion(tableName, method, CONF, family); final WALFactory wals = new WALFactory(CONF, method); try { Path regiondir = region.getRegionFileSystem().getRegionDir(); FileSystem fs = region.getRegionFileSystem().getFileSystem(); byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); byte[][] columns = region.getTableDescriptor().getColumnFamilyNames().toArray(new byte[0][]); assertEquals(0, region.getStoreFileList(columns).size()); Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); long maxSeqId = 1050; long minSeqId = 1000; for (long i = minSeqId; i <= maxSeqId; i += 10) { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); fs.create(recoveredEdits); WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); long time = System.nanoTime(); WALEdit edit = null; if (i == maxSeqId) { edit = WALEdit.createCompaction(region.getRegionInfo(), CompactionDescriptor.newBuilder() .setTableName(ByteString.copyFrom(tableName.getName())) .setFamilyName(ByteString.copyFrom(regionName)) .setEncodedRegionName(ByteString.copyFrom(regionName)) .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString()))) .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName())) .build()); } else { edit = new WALEdit(); edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes .toBytes(i))); } writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time, HConstants.DEFAULT_CLUSTER_ID), edit)); writer.close(); } long recoverSeqId = 1030; Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); MonitoredTask status = TaskMonitor.get().createStatus(method); for (HStore store : region.getStores()) { maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1); } long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); assertEquals(maxSeqId, seqId); // assert that the files are flushed assertEquals(1, region.getStoreFileList(columns).size()); } finally { HBaseTestingUtility.closeRegionAndWAL(this.region); this.region = null; wals.close(); } }
/** * Initializes the replication service object. * @param walProvider can be null if not initialized inside a live region server environment, for * example, {@code ReplicationSyncUp}. */ void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALProvider walProvider) throws IOException;