Java 类org.apache.hadoop.hbase.replication.regionserver.ReplicationSource 实例源码

项目:hbase    文件:TestReplicationEmptyWALRecovery.java   
/**
 * Waits until there is only one log(the current writing one) in the replication queue
 * @param numRs number of regionservers
 */
private void waitForLogAdvance(int numRs) throws Exception {
  Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() {
    @Override
    public boolean evaluate() throws Exception {
      for (int i = 0; i < numRs; i++) {
        HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i);
        RegionInfo regionInfo =
            utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
        WAL wal = hrs.getWAL(regionInfo);
        Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
        Replication replicationService = (Replication) utility1.getHBaseCluster()
            .getRegionServer(i).getReplicationSourceService();
        for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
            .getSources()) {
          ReplicationSource source = (ReplicationSource) rsi;
          if (!currentFile.equals(source.getCurrentPath())) {
            return false;
          }
        }
      }
      return true;
    }
  });
}
项目:hbase    文件:TestReplicationSource.java   
/**
 * Tests that {@link ReplicationSource#terminate(String)} will timeout properly
 */
@Test
public void testTerminateTimeout() throws Exception {
  ReplicationSource source = new ReplicationSource();
  ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() {
    @Override
    protected void doStart() {
      notifyStarted();
    }

    @Override
    protected void doStop() {
      // not calling notifyStopped() here causes the caller of stop() to get a Future that never
      // completes
    }
  };
  replicationEndpoint.start();
  ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
  Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
  Configuration testConf = HBaseConfiguration.create();
  testConf.setInt("replication.source.maxretriesmultiplier", 1);
  ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
  Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
  source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null,
    p -> OptionalLong.empty(), null);
  ExecutorService executor = Executors.newSingleThreadExecutor();
  Future<?> future = executor.submit(new Runnable() {

    @Override
    public void run() {
      source.terminate("testing source termination");
    }
  });
  long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
  Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate<Exception>() {

    @Override
    public boolean evaluate() throws Exception {
      return future.isDone();
    }

  });

}