Java 类org.apache.hadoop.hbase.replication.ReplicationEndpoint 实例源码

项目:hbase    文件:ReplicationSource.java   
private ReplicationEndpoint createReplicationEndpoint()
    throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
  RegionServerCoprocessorHost rsServerHost = null;
  if (server instanceof HRegionServer) {
    rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
  }
  String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl();
  if (replicationEndpointImpl == null) {
    // Default to HBase inter-cluster replication endpoint
    replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
  }
  ReplicationEndpoint replicationEndpoint =
    Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class).newInstance();
  if (rsServerHost != null) {
    ReplicationEndpoint newReplicationEndPoint =
      rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
    if (newReplicationEndPoint != null) {
      // Override the newly created endpoint from the hook with configured end point
      replicationEndpoint = newReplicationEndPoint;
    }
  }
  return replicationEndpoint;
}
项目:ditb    文件:ReplicationSource.java   
/**
 * Instantiation method used by region servers
 *
 * @param conf configuration to use
 * @param fs file system to use
 * @param manager replication manager to ping to
 * @param stopper     the atomic boolean to use to stop the regionserver
 * @param peerClusterZnode the name of our znode
 * @param clusterId unique UUID for the cluster
 * @param replicationEndpoint the replication endpoint implementation
 * @param metrics metrics for replication source
 * @throws IOException
 */
@Override
public void init(final Configuration conf, final FileSystem fs,
    final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final Stoppable stopper,
    final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
    final MetricsSource metrics)
        throws IOException {
  this.stopper = stopper;
  this.conf = HBaseConfiguration.create(conf);
  decorateConf();
  this.replicationQueueSizeCapacity =
      this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
  this.replicationQueueNbCapacity =
      this.conf.getInt("replication.source.nb.capacity", 25000);
  this.sleepForRetries =
      this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
  this.maxRetriesMultiplier =
      this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
  this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
  long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
  this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
  this.replicationQueues = replicationQueues;
  this.replicationPeers = replicationPeers;
  this.manager = manager;
  this.fs = fs;
  this.metrics = metrics;
  this.clusterId = clusterId;

  this.peerClusterZnode = peerClusterZnode;
  this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
  // ReplicationQueueInfo parses the peerId out of the znode for us
  this.peerId = this.replicationQueueInfo.getPeerId();
  this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
  this.replicationEndpoint = replicationEndpoint;
}
项目:ditb    文件:RegionServerCoprocessorHost.java   
public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
    throws IOException {
  return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
      : new CoprocessOperationWithResult<ReplicationEndpoint>() {
        @Override
        public void call(RegionServerObserver oserver,
            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
          setResult(oserver.postCreateReplicationEndPoint(ctx, getResult()));
        }
      });
}
项目:ditb    文件:TestRegionReplicaReplicationEndpointNoMaster.java   
@Test (timeout = 240000)
public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
  // tests replaying the edits to a secondary region replica using the RRRE.replicate()
  openRegion(HTU, rs0, hriSecondary);
  ClusterConnection connection =
      (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
  RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();

  ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
  when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
  when(context.getMetrics()).thenReturn(mock(MetricsSource.class));

  replicator.init(context);
  replicator.start();

  //load some data to primary
  HTU.loadNumericRows(table, f, 0, 1000);

  Assert.assertEquals(1000, entries.size());
  // replay the edits to the secondary using replay callable
  final String fakeWalGroupId = "fakeWALGroup";
  replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))
      .setWalGroupId(fakeWalGroupId));

  Region region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName());
  HTU.verifyNumericRows(region, f, 0, 1000);

  HTU.deleteNumericRows(table, f, 0, 1000);
  closeRegion(HTU, rs0, hriSecondary);
  connection.close();
}
项目:pbase    文件:RegionServerCoprocessorHost.java   
public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
    throws IOException {
  return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
      : new CoprocessOperationWithResult<ReplicationEndpoint>() {
        @Override
        public void call(RegionServerObserver oserver,
            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
          setResult(oserver.postCreateReplicationEndPoint(ctx, getResult()));
        }
      });
}
项目:hbase    文件:ReplicationSource.java   
private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint)
    throws IOException, TimeoutException {
  TableDescriptors tableDescriptors = null;
  if (server instanceof HRegionServer) {
    tableDescriptors = ((HRegionServer) server).getTableDescriptors();
  }
  replicationEndpoint
      .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId,
          clusterId, replicationPeer, metrics, tableDescriptors, server));
  replicationEndpoint.start();
  replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
}
项目:hbase    文件:RegionServerCoprocessorHost.java   
public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
    throws IOException {
  if (this.coprocEnvironments.isEmpty()) {
    return endpoint;
  }
  return execOperationWithResult(
      new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(
          rsObserverGetter, endpoint) {
    @Override
    public ReplicationEndpoint call(RegionServerObserver observer) throws IOException {
      return observer.postCreateReplicationEndPoint(this, getResult());
    }
  });
}
项目:hbase    文件:TestRegionReplicaReplicationEndpointNoMaster.java   
@Test (timeout = 240000)
public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
  // tests replaying the edits to a secondary region replica using the RRRE.replicate()
  openRegion(HTU, rs0, hriSecondary);
  ClusterConnection connection =
      (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
  RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();

  ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
  when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
  when(context.getMetrics()).thenReturn(mock(MetricsSource.class));

  replicator.init(context);
  replicator.startAsync();

  //load some data to primary
  HTU.loadNumericRows(table, f, 0, 1000);

  Assert.assertEquals(1000, entries.size());
  // replay the edits to the secondary using replay callable
  final String fakeWalGroupId = "fakeWALGroup";
  replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))
      .setWalGroupId(fakeWalGroupId));

  Region region = rs0.getRegion(hriSecondary.getEncodedName());
  HTU.verifyNumericRows(region, f, 0, 1000);

  HTU.deleteNumericRows(table, f, 0, 1000);
  closeRegion(HTU, rs0, hriSecondary);
  connection.close();
}
项目:ditb    文件:ReplicationSource.java   
/**
 * Do the shipping logic
 * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
 * written to when this method was called
 */
protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
  int sleepMultiplier = 0;
  if (entries.isEmpty()) {
    LOG.warn("Was given 0 edits to ship");
    return;
  }
  while (isWorkerActive()) {
    try {
      if (throttler.isEnabled()) {
        long sleepTicks = throttler.getNextSleepInterval(currentSize);
        if (sleepTicks > 0) {
          try {
            if (LOG.isTraceEnabled()) {
              LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
            }
            Thread.sleep(sleepTicks);
          } catch (InterruptedException e) {
            LOG.debug("Interrupted while sleeping for throttling control");
            Thread.currentThread().interrupt();
            // current thread might be interrupted to terminate
            // directly go back to while() for confirm this
            continue;
          }
          // reset throttler's cycle start tick when sleep for throttling occurs
          throttler.resetStartTick();
        }
      }
      // create replicateContext here, so the entries can be GC'd upon return from this call
      // stack
      ReplicationEndpoint.ReplicateContext replicateContext =
          new ReplicationEndpoint.ReplicateContext();
      replicateContext.setEntries(entries).setSize(currentSize);
      replicateContext.setWalGroupId(walGroupId);

      long startTimeNs = System.nanoTime();
      // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
      boolean replicated = replicationEndpoint.replicate(replicateContext);
      long endTimeNs = System.nanoTime();

      if (!replicated) {
        continue;
      } else {
        sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
      }

      if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
        manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
          this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
          currentWALisBeingWrittenTo);
        this.lastLoggedPosition = this.repLogReader.getPosition();
      }
      if (throttler.isEnabled()) {
        throttler.addPushSize(currentSize);
      }
      totalReplicatedEdits.addAndGet(entries.size());
      totalReplicatedOperations.addAndGet(currentNbOperations);
      // FIXME check relationship between wal group and overall
      metrics.shipBatch(currentNbOperations, currentSize / 1024);
      metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
        walGroupId);
      if (LOG.isTraceEnabled()) {
        LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or "
            + totalReplicatedOperations + " operations in "
            + ((endTimeNs - startTimeNs) / 1000000) + " ms");
      }
      break;
    } catch (Exception ex) {
      LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:"
          + org.apache.hadoop.util.StringUtils.stringifyException(ex));
      if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
        sleepMultiplier++;
      }
    }
  }
}
项目:ditb    文件:VisibilityReplicationEndpoint.java   
public VisibilityReplicationEndpoint(ReplicationEndpoint endpoint,
    VisibilityLabelService visibilityLabelsService) {
  this.delegator = endpoint;
  this.visibilityLabelsService = visibilityLabelsService;
}
项目:ditb    文件:VisibilityController.java   
@Override
public ReplicationEndpoint postCreateReplicationEndPoint(
    ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
  return new VisibilityReplicationEndpoint(endpoint, visibilityLabelService);
}
项目:ditb    文件:AccessController.java   
@Override
public ReplicationEndpoint postCreateReplicationEndPoint(
    ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
  return endpoint;
}
项目:ditb    文件:BaseRegionServerObserver.java   
@Override
public ReplicationEndpoint postCreateReplicationEndPoint(
    ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
  return endpoint;
}
项目:ditb    文件:TestVisibilityLabelsReplication.java   
public VisibilityReplicationEndPointForTest(ReplicationEndpoint endpoint,
    VisibilityLabelService visibilityLabelsService) {
  super(endpoint, visibilityLabelsService);
}
项目:pbase    文件:ReplicationSource.java   
/**
 * Instantiation method used by region servers
 *
 * @param conf configuration to use
 * @param fs file system to use
 * @param manager replication manager to ping to
 * @param stopper     the atomic boolean to use to stop the regionserver
 * @param peerClusterZnode the name of our znode
 * @param clusterId unique UUID for the cluster
 * @param replicationEndpoint the replication endpoint implementation
 * @param metrics metrics for replication source
 * @throws IOException
 */
@Override
public void init(final Configuration conf, final FileSystem fs,
    final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final Stoppable stopper,
    final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
    final MetricsSource metrics)
        throws IOException {
  this.stopper = stopper;
  this.conf = HBaseConfiguration.create(conf);
  decorateConf();
  this.replicationQueueSizeCapacity =
      this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
  this.replicationQueueNbCapacity =
      this.conf.getInt("replication.source.nb.capacity", 25000);
  this.sleepForRetries =
      this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
  this.maxRetriesMultiplier =
      this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
  this.queue =
      new PriorityBlockingQueue<Path>(
          this.conf.getInt("hbase.regionserver.maxlogs", 32),
          new LogsComparator());
  long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
  this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
  this.replicationQueues = replicationQueues;
  this.replicationPeers = replicationPeers;
  this.manager = manager;
  this.fs = fs;
  this.metrics = metrics;
  this.repLogReader = new ReplicationWALReaderManager(this.fs, this.conf);
  this.clusterId = clusterId;

  this.peerClusterZnode = peerClusterZnode;
  this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
  // ReplicationQueueInfo parses the peerId out of the znode for us
  this.peerId = this.replicationQueueInfo.getPeerId();
  this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
  this.replicationEndpoint = replicationEndpoint;

  this.replicateContext = new ReplicationEndpoint.ReplicateContext();
}
项目:pbase    文件:VisibilityReplicationEndpoint.java   
public VisibilityReplicationEndpoint(ReplicationEndpoint endpoint,
    VisibilityLabelService visibilityLabelsService) {
  this.delegator = endpoint;
  this.visibilityLabelsService = visibilityLabelsService;
}
项目:pbase    文件:VisibilityController.java   
@Override
public ReplicationEndpoint postCreateReplicationEndPoint(
    ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
  return new VisibilityReplicationEndpoint(endpoint, visibilityLabelService);
}
项目:pbase    文件:AccessController.java   
@Override
public ReplicationEndpoint postCreateReplicationEndPoint(
    ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
  return endpoint;
}
项目:pbase    文件:BaseRegionServerObserver.java   
@Override
public ReplicationEndpoint postCreateReplicationEndPoint(
    ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
  return endpoint;
}
项目:pbase    文件:TestVisibilityLabelsReplication.java   
public VisibilityReplicationEndPointForTest(ReplicationEndpoint endpoint,
    VisibilityLabelService visibilityLabelsService) {
  super(endpoint, visibilityLabelsService);
}
项目:hbase    文件:ReplicationSource.java   
@Override
public ReplicationEndpoint getReplicationEndpoint() {
  return this.replicationEndpoint;
}
项目:hbase    文件:VisibilityReplicationEndpoint.java   
public VisibilityReplicationEndpoint(ReplicationEndpoint endpoint,
    VisibilityLabelService visibilityLabelsService) {
  this.delegator = endpoint;
  this.visibilityLabelsService = visibilityLabelsService;
}
项目:hbase    文件:VisibilityReplication.java   
@Override
public ReplicationEndpoint postCreateReplicationEndPoint(
    ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
  return new VisibilityReplicationEndpoint(endpoint, visibilityLabelService);
}
项目:hbase    文件:AccessController.java   
@Override
public ReplicationEndpoint postCreateReplicationEndPoint(
    ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
  return endpoint;
}
项目:hbase    文件:TestVisibilityLabelsReplication.java   
public VisibilityReplicationEndPointForTest(ReplicationEndpoint endpoint,
    VisibilityLabelService visibilityLabelsService) {
  super(endpoint, visibilityLabelsService);
}
项目:spliceengine    文件:ShutdownRegionServerObserver.java   
public ReplicationEndpoint postCreateReplicationEndPoint(ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
    return null;
}
项目:spliceengine    文件:ShutdownRegionServerObserver.java   
public ReplicationEndpoint postCreateReplicationEndPoint(ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
    return endpoint;
}
项目:spliceengine    文件:ShutdownRegionServerObserver.java   
public ReplicationEndpoint postCreateReplicationEndPoint(ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
    return endpoint;
}
项目:spliceengine    文件:ShutdownRegionServerObserver.java   
public ReplicationEndpoint postCreateReplicationEndPoint(ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
    return endpoint;
}
项目:spliceengine    文件:ShutdownRegionServerObserver.java   
public ReplicationEndpoint postCreateReplicationEndPoint(ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
    return endpoint;
}
项目:spliceengine    文件:ShutdownRegionServerObserver.java   
public ReplicationEndpoint postCreateReplicationEndPoint(ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
    return null;
}
项目:spliceengine    文件:ShutdownRegionServerObserver.java   
public ReplicationEndpoint postCreateReplicationEndPoint(ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
    return endpoint;
}
项目:ditb    文件:ReplicationSourceInterface.java   
/**
 * Initializer for the source
 * @param conf the configuration to use
 * @param fs the file system to use
 * @param manager the manager to use
 * @param replicationQueues
 * @param replicationPeers
 * @param stopper the stopper object for this region server
 * @param peerClusterZnode
 * @param clusterId
 * @throws IOException
 */
public void init(final Configuration conf, final FileSystem fs,
    final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final Stoppable stopper,
    final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
    final MetricsSource metrics) throws IOException;
项目:ditb    文件:RegionServerObserver.java   
/**
 * This will be called after the replication endpoint is instantiated.
 * @param ctx
 * @param endpoint - the base endpoint for replication
 * @return the endpoint to use during replication.
 */
ReplicationEndpoint postCreateReplicationEndPoint(
    ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint);
项目:pbase    文件:ReplicationSourceInterface.java   
/**
 * Initializer for the source
 * @param conf the configuration to use
 * @param fs the file system to use
 * @param manager the manager to use
 * @param replicationQueues
 * @param replicationPeers
 * @param stopper the stopper object for this region server
 * @param peerClusterZnode
 * @param clusterId
 * @throws IOException
 */
public void init(final Configuration conf, final FileSystem fs,
    final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final Stoppable stopper,
    final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
    final MetricsSource metrics) throws IOException;
项目:pbase    文件:RegionServerObserver.java   
/**
 * This will be called after the replication endpoint is instantiated.
 * @param ctx
 * @param endpoint - the base endpoint for replication
 * @return the endpoint to use during replication.
 */
ReplicationEndpoint postCreateReplicationEndPoint(
    ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint);
项目:hbase    文件:ReplicationSourceInterface.java   
/**
 * @return the replication endpoint used by this replication source
 */
ReplicationEndpoint getReplicationEndpoint();
项目:hbase    文件:RegionServerObserver.java   
/**
 * This will be called after the replication endpoint is instantiated.
 * @param ctx the environment to interact with the framework and region server.
 * @param endpoint - the base endpoint for replication
 * @return the endpoint to use during replication.
 */
default ReplicationEndpoint postCreateReplicationEndPoint(
    ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
  return endpoint;
}