private ReplicationEndpoint createReplicationEndpoint() throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { RegionServerCoprocessorHost rsServerHost = null; if (server instanceof HRegionServer) { rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); } String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl(); if (replicationEndpointImpl == null) { // Default to HBase inter-cluster replication endpoint replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName(); } ReplicationEndpoint replicationEndpoint = Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class).newInstance(); if (rsServerHost != null) { ReplicationEndpoint newReplicationEndPoint = rsServerHost.postCreateReplicationEndPoint(replicationEndpoint); if (newReplicationEndPoint != null) { // Override the newly created endpoint from the hook with configured end point replicationEndpoint = newReplicationEndPoint; } } return replicationEndpoint; }
/** * Instantiation method used by region servers * * @param conf configuration to use * @param fs file system to use * @param manager replication manager to ping to * @param stopper the atomic boolean to use to stop the regionserver * @param peerClusterZnode the name of our znode * @param clusterId unique UUID for the cluster * @param replicationEndpoint the replication endpoint implementation * @param metrics metrics for replication source * @throws IOException */ @Override public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint, final MetricsSource metrics) throws IOException { this.stopper = stopper; this.conf = HBaseConfiguration.create(conf); decorateConf(); this.replicationQueueSizeCapacity = this.conf.getLong("replication.source.size.capacity", 1024*1024*64); this.replicationQueueNbCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); this.throttler = new ReplicationThrottler((double)bandwidth/10.0); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.manager = manager; this.fs = fs; this.metrics = metrics; this.clusterId = clusterId; this.peerClusterZnode = peerClusterZnode; this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); this.replicationEndpoint = replicationEndpoint; }
public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint) throws IOException { return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null : new CoprocessOperationWithResult<ReplicationEndpoint>() { @Override public void call(RegionServerObserver oserver, ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException { setResult(oserver.postCreateReplicationEndPoint(ctx, getResult())); } }); }
@Test (timeout = 240000) public void testRegionReplicaReplicationEndpointReplicate() throws Exception { // tests replaying the edits to a secondary region replica using the RRRE.replicate() openRegion(HTU, rs0, hriSecondary); ClusterConnection connection = (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration()); RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint(); ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class); when(context.getConfiguration()).thenReturn(HTU.getConfiguration()); when(context.getMetrics()).thenReturn(mock(MetricsSource.class)); replicator.init(context); replicator.start(); //load some data to primary HTU.loadNumericRows(table, f, 0, 1000); Assert.assertEquals(1000, entries.size()); // replay the edits to the secondary using replay callable final String fakeWalGroupId = "fakeWALGroup"; replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries)) .setWalGroupId(fakeWalGroupId)); Region region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName()); HTU.verifyNumericRows(region, f, 0, 1000); HTU.deleteNumericRows(table, f, 0, 1000); closeRegion(HTU, rs0, hriSecondary); connection.close(); }
private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint) throws IOException, TimeoutException { TableDescriptors tableDescriptors = null; if (server instanceof HRegionServer) { tableDescriptors = ((HRegionServer) server).getTableDescriptors(); } replicationEndpoint .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server)); replicationEndpoint.start(); replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS); }
public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint) throws IOException { if (this.coprocEnvironments.isEmpty()) { return endpoint; } return execOperationWithResult( new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>( rsObserverGetter, endpoint) { @Override public ReplicationEndpoint call(RegionServerObserver observer) throws IOException { return observer.postCreateReplicationEndPoint(this, getResult()); } }); }
@Test (timeout = 240000) public void testRegionReplicaReplicationEndpointReplicate() throws Exception { // tests replaying the edits to a secondary region replica using the RRRE.replicate() openRegion(HTU, rs0, hriSecondary); ClusterConnection connection = (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration()); RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint(); ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class); when(context.getConfiguration()).thenReturn(HTU.getConfiguration()); when(context.getMetrics()).thenReturn(mock(MetricsSource.class)); replicator.init(context); replicator.startAsync(); //load some data to primary HTU.loadNumericRows(table, f, 0, 1000); Assert.assertEquals(1000, entries.size()); // replay the edits to the secondary using replay callable final String fakeWalGroupId = "fakeWALGroup"; replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries)) .setWalGroupId(fakeWalGroupId)); Region region = rs0.getRegion(hriSecondary.getEncodedName()); HTU.verifyNumericRows(region, f, 0, 1000); HTU.deleteNumericRows(table, f, 0, 1000); closeRegion(HTU, rs0, hriSecondary); connection.close(); }
/** * Do the shipping logic * @param currentWALisBeingWrittenTo was the current WAL being (seemingly) * written to when this method was called */ protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) { int sleepMultiplier = 0; if (entries.isEmpty()) { LOG.warn("Was given 0 edits to ship"); return; } while (isWorkerActive()) { try { if (throttler.isEnabled()) { long sleepTicks = throttler.getNextSleepInterval(currentSize); if (sleepTicks > 0) { try { if (LOG.isTraceEnabled()) { LOG.trace("To sleep " + sleepTicks + "ms for throttling control"); } Thread.sleep(sleepTicks); } catch (InterruptedException e) { LOG.debug("Interrupted while sleeping for throttling control"); Thread.currentThread().interrupt(); // current thread might be interrupted to terminate // directly go back to while() for confirm this continue; } // reset throttler's cycle start tick when sleep for throttling occurs throttler.resetStartTick(); } } // create replicateContext here, so the entries can be GC'd upon return from this call // stack ReplicationEndpoint.ReplicateContext replicateContext = new ReplicationEndpoint.ReplicateContext(); replicateContext.setEntries(entries).setSize(currentSize); replicateContext.setWalGroupId(walGroupId); long startTimeNs = System.nanoTime(); // send the edits to the endpoint. Will block until the edits are shipped and acknowledged boolean replicated = replicationEndpoint.replicate(replicateContext); long endTimeNs = System.nanoTime(); if (!replicated) { continue; } else { sleepMultiplier = Math.max(sleepMultiplier - 1, 0); } if (this.lastLoggedPosition != this.repLogReader.getPosition()) { manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode, this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo); this.lastLoggedPosition = this.repLogReader.getPosition(); } if (throttler.isEnabled()) { throttler.addPushSize(currentSize); } totalReplicatedEdits.addAndGet(entries.size()); totalReplicatedOperations.addAndGet(currentNbOperations); // FIXME check relationship between wal group and overall metrics.shipBatch(currentNbOperations, currentSize / 1024); metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); if (LOG.isTraceEnabled()) { LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or " + totalReplicatedOperations + " operations in " + ((endTimeNs - startTimeNs) / 1000000) + " ms"); } break; } catch (Exception ex) { LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" + org.apache.hadoop.util.StringUtils.stringifyException(ex)); if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { sleepMultiplier++; } } } }
public VisibilityReplicationEndpoint(ReplicationEndpoint endpoint, VisibilityLabelService visibilityLabelsService) { this.delegator = endpoint; this.visibilityLabelsService = visibilityLabelsService; }
@Override public ReplicationEndpoint postCreateReplicationEndPoint( ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) { return new VisibilityReplicationEndpoint(endpoint, visibilityLabelService); }
@Override public ReplicationEndpoint postCreateReplicationEndPoint( ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) { return endpoint; }
public VisibilityReplicationEndPointForTest(ReplicationEndpoint endpoint, VisibilityLabelService visibilityLabelsService) { super(endpoint, visibilityLabelsService); }
/** * Instantiation method used by region servers * * @param conf configuration to use * @param fs file system to use * @param manager replication manager to ping to * @param stopper the atomic boolean to use to stop the regionserver * @param peerClusterZnode the name of our znode * @param clusterId unique UUID for the cluster * @param replicationEndpoint the replication endpoint implementation * @param metrics metrics for replication source * @throws IOException */ @Override public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint, final MetricsSource metrics) throws IOException { this.stopper = stopper; this.conf = HBaseConfiguration.create(conf); decorateConf(); this.replicationQueueSizeCapacity = this.conf.getLong("replication.source.size.capacity", 1024*1024*64); this.replicationQueueNbCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.queue = new PriorityBlockingQueue<Path>( this.conf.getInt("hbase.regionserver.maxlogs", 32), new LogsComparator()); long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); this.throttler = new ReplicationThrottler((double)bandwidth/10.0); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.manager = manager; this.fs = fs; this.metrics = metrics; this.repLogReader = new ReplicationWALReaderManager(this.fs, this.conf); this.clusterId = clusterId; this.peerClusterZnode = peerClusterZnode; this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); this.replicationEndpoint = replicationEndpoint; this.replicateContext = new ReplicationEndpoint.ReplicateContext(); }
@Override public ReplicationEndpoint getReplicationEndpoint() { return this.replicationEndpoint; }
public ReplicationEndpoint postCreateReplicationEndPoint(ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) { return null; }
public ReplicationEndpoint postCreateReplicationEndPoint(ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) { return endpoint; }
/** * Initializer for the source * @param conf the configuration to use * @param fs the file system to use * @param manager the manager to use * @param replicationQueues * @param replicationPeers * @param stopper the stopper object for this region server * @param peerClusterZnode * @param clusterId * @throws IOException */ public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint, final MetricsSource metrics) throws IOException;
/** * This will be called after the replication endpoint is instantiated. * @param ctx * @param endpoint - the base endpoint for replication * @return the endpoint to use during replication. */ ReplicationEndpoint postCreateReplicationEndPoint( ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint);
/** * @return the replication endpoint used by this replication source */ ReplicationEndpoint getReplicationEndpoint();
/** * This will be called after the replication endpoint is instantiated. * @param ctx the environment to interact with the framework and region server. * @param endpoint - the base endpoint for replication * @return the endpoint to use during replication. */ default ReplicationEndpoint postCreateReplicationEndPoint( ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) { return endpoint; }