/** * @throws java.lang.Exception */ @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniZKCluster(); Configuration conf = TEST_UTIL.getConfiguration(); conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); admin = new ReplicationAdmin(conf); Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); Path logDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_LOGDIR_NAME); manager = new ReplicationSourceManager(admin.getReplicationZk(), conf, // The following stopper never stops so that we can respond // to zk notification new Stoppable() { @Override public void stop(String why) {} @Override public boolean isStopped() {return false;} }, FileSystem.get(conf), replicating, logDir, oldLogDir); }
@Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId, UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics) throws IOException { this.manager = manager; this.peerClusterId = peerClusterId; }
@Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, Stoppable stopper, AtomicBoolean replicating, String peerClusterId) throws IOException { this.manager = manager; this.peerClusterId = peerClusterId; }
@Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId, UUID clusterId) throws IOException { this.manager = manager; this.peerClusterId = peerClusterId; }
@Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { this.manager = manager; this.peerClusterId = peerClusterId; this.metrics = metrics; this.walFileLengthProvider = walFileLengthProvider; }
/** * Tests that {@link ReplicationSource#terminate(String)} will timeout properly */ @Test public void testTerminateTimeout() throws Exception { ReplicationSource source = new ReplicationSource(); ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() { @Override protected void doStart() { notifyStarted(); } @Override protected void doStop() { // not calling notifyStopped() here causes the caller of stop() to get a Future that never // completes } }; replicationEndpoint.start(); ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); Configuration testConf = HBaseConfiguration.create(); testConf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null, p -> OptionalLong.empty(), null); ExecutorService executor = Executors.newSingleThreadExecutor(); Future<?> future = executor.submit(new Runnable() { @Override public void run() { source.terminate("testing source termination"); } }); long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000); Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate<Exception>() { @Override public boolean evaluate() throws Exception { return future.isDone(); } }); }
/** * Tests that recovered queues are preserved on a regionserver shutdown. * See HBASE-18192 * @throws Exception */ @Test public void testServerShutdownRecoveredQueue() throws Exception { try { // Ensure single-threaded WAL conf.set("hbase.wal.provider", "defaultProvider"); conf.setInt("replication.sleep.before.failover", 2000); // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in. conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName()); MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2); TEST_UTIL_PEER.startMiniCluster(1); HRegionServer serverA = cluster.getRegionServer(0); final ReplicationSourceManager managerA = ((Replication) serverA.getReplicationSourceService()).getReplicationManager(); HRegionServer serverB = cluster.getRegionServer(1); final ReplicationSourceManager managerB = ((Replication) serverB.getReplicationSourceService()).getReplicationManager(); final Admin admin = TEST_UTIL.getAdmin(); final String peerId = "TestPeer"; admin.addReplicationPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey())); // Wait for replication sources to come up Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty()); } }); // Disabling peer makes sure there is at least one log to claim when the server dies // The recovered queue will also stay there until the peer is disabled even if the // WALs it contains have no data. admin.disableReplicationPeer(peerId); // Stopping serverA // It's queues should be claimed by the only other alive server i.e. serverB cluster.stopRegionServer(serverA.getServerName()); Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { return managerB.getOldSources().size() == 1; } }); final HRegionServer serverC = cluster.startRegionServer().getRegionServer(); serverC.waitForServerOnline(); Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { return serverC.getReplicationSourceService() != null; } }); final ReplicationSourceManager managerC = ((Replication) serverC.getReplicationSourceService()).getReplicationManager(); // Sanity check assertEquals(0, managerC.getOldSources().size()); // Stopping serverB // Now serverC should have two recovered queues: // 1. The serverB's normal queue // 2. serverA's recovered queue on serverB cluster.stopRegionServer(serverB.getServerName()); Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { return managerC.getOldSources().size() == 2; } }); admin.enableReplicationPeer(peerId); Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { return managerC.getOldSources().size() == 0; } }); } finally { conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName()); } }
@Override public ReplicationSourceManager getSourceManager() { return manager; }