/** * Waits until there is only one log(the current writing one) in the replication queue * @param numRs number of regionservers */ private void waitForLogAdvance(int numRs) throws Exception { Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { for (int i = 0; i < numRs; i++) { HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i); RegionInfo regionInfo = utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); WAL wal = hrs.getWAL(regionInfo); Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName(); Replication replicationService = (Replication) utility1.getHBaseCluster() .getRegionServer(i).getReplicationSourceService(); for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() .getSources()) { ReplicationSource source = (ReplicationSource) rsi; if (!currentFile.equals(source.getCurrentPath())) { return false; } } } return true; } }); }
/** * Tests that {@link ReplicationSource#terminate(String)} will timeout properly */ @Test public void testTerminateTimeout() throws Exception { ReplicationSource source = new ReplicationSource(); ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() { @Override protected void doStart() { notifyStarted(); } @Override protected void doStop() { // not calling notifyStopped() here causes the caller of stop() to get a Future that never // completes } }; replicationEndpoint.start(); ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); Configuration testConf = HBaseConfiguration.create(); testConf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null, p -> OptionalLong.empty(), null); ExecutorService executor = Executors.newSingleThreadExecutor(); Future<?> future = executor.submit(new Runnable() { @Override public void run() { source.terminate("testing source termination"); } }); long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000); Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate<Exception>() { @Override public boolean evaluate() throws Exception { return future.isDone(); } }); }