@Override public Integer call() throws IOException { SinkPeer sinkPeer = null; try { sinkPeer = replicationSinkMgr.getReplicationSink(); BlockingInterface rrs = sinkPeer.getRegionServer(); ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()])); replicationSinkMgr.reportSinkSuccess(sinkPeer); return ordinal; } catch (IOException ioe) { if (sinkPeer != null) { replicationSinkMgr.reportBadSink(sinkPeer); } throw ioe; } }
private void replayToServer(HRegionInfo regionInfo, List<Entry> entries) throws IOException, ServiceException { if (entries.isEmpty()) return; Entry[] entriesArray = new Entry[entries.size()]; entriesArray = entries.toArray(entriesArray); AdminService.BlockingInterface remoteSvr = conn.getAdmin(getLocation().getServerName()); Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray); PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond()); try { remoteSvr.replay(controller, p.getFirst()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } }
private void replayToServer(HRegionInfo regionInfo, List<HLog.Entry> entries) throws IOException, ServiceException { if (entries.isEmpty()) return; HLog.Entry[] entriesArray = new HLog.Entry[entries.size()]; entriesArray = entries.toArray(entriesArray); AdminService.BlockingInterface remoteSvr = conn.getAdmin(getLocation().getServerName()); Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray); try { PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond()); remoteSvr.replay(controller, p.getFirst()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } }
private void replayToServer(HRegionInfo regionInfo, List<HLog.Entry> entries) throws IOException, ServiceException { if (entries.isEmpty()) return; HLog.Entry[] entriesArray = new HLog.Entry[entries.size()]; entriesArray = entries.toArray(entriesArray); AdminService.BlockingInterface remoteSvr = conn.getAdmin(getLocation().getServerName()); Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray); PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond()); try { remoteSvr.replay(controller, p.getFirst()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } }
@Override public ReplicateWALEntryResponse call(HBaseRpcController controller) throws Exception { // Check whether we should still replay this entry. If the regions are changed, or the // entry is not coming form the primary region, filter it out because we do not need it. // Regions can change because of (1) region split (2) region merge (3) table recreated boolean skip = false; if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(), initialEncodedRegionName)) { skip = true; } if (!this.entries.isEmpty() && !skip) { Entry[] entriesArray = new Entry[this.entries.size()]; entriesArray = this.entries.toArray(entriesArray); // set the region name for the target region replica Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location .getRegionInfo().getEncodedNameAsBytes(), null, null, null); controller.setCellScanner(p.getSecond()); return stub.replay(controller, p.getFirst()); } if (skip) { if (LOG.isTraceEnabled()) { LOG.trace("Skipping " + entries.size() + " entries in table " + tableName + " because located region " + location.getRegionInfo().getEncodedName() + " is different than the original region " + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit"); for (Entry entry : entries) { LOG.trace("Skipping : " + entry); } } skippedEntries.addAndGet(entries.size()); } return ReplicateWALEntryResponse.newBuilder().build(); }
private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout) throws IOException { // check whether we should still replay this entry. If the regions are changed, or the // entry is not coming form the primary region, filter it out because we do not need it. // Regions can change because of (1) region split (2) region merge (3) table recreated boolean skip = false; if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(), initialEncodedRegionName)) { skip = true; } if (!entries.isEmpty() && !skip) { Entry[] entriesArray = new Entry[entries.size()]; entriesArray = entries.toArray(entriesArray); // set the region name for the target region replica Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil.buildReplicateWALEntryRequest( entriesArray, location.getRegionInfo().getEncodedNameAsBytes()); try { PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond()); controller.setCallTimeout(timeout); controller.setPriority(tableName); return stub.replay(controller, p.getFirst()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } } if (skip) { if (LOG.isTraceEnabled()) { LOG.trace("Skipping " + entries.size() + " entries in table " + tableName + " because located region " + location.getRegionInfo().getEncodedName() + " is different than the original region " + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit"); for (Entry entry : entries) { LOG.trace("Skipping : " + entry); } } skippedEntries.addAndGet(entries.size()); } return ReplicateWALEntryResponse.newBuilder().build(); }
/** * Do the shipping logic */ @Override public boolean replicate(ReplicateContext replicateContext) { List<Entry> entries = replicateContext.getEntries(); int sleepMultiplier = 1; while (this.isRunning()) { if (!peersSelected) { connectToPeers(); peersSelected = true; } if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { sleepMultiplier++; } continue; } SinkPeer sinkPeer = null; try { sinkPeer = replicationSinkMgr.getReplicationSink(); BlockingInterface rrs = sinkPeer.getRegionServer(); if (LOG.isTraceEnabled()) { LOG.trace("Replicating " + entries.size() + " entries of total size " + replicateContext.getSize()); } ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()])); // update metrics this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime()); return true; } catch (IOException ioe) { // Didn't ship anything, but must still age the last time we did this.metrics.refreshAgeOfLastShippedOp(); if (ioe instanceof RemoteException) { ioe = ((RemoteException) ioe).unwrapRemoteException(); LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); if (ioe instanceof TableNotFoundException) { if (sleepForRetries("A table is missing in the peer cluster. " + "Replication cannot proceed without losing data.", sleepMultiplier)) { sleepMultiplier++; } } } else { if (ioe instanceof SocketTimeoutException) { // This exception means we waited for more than 60s and nothing // happened, the cluster is alive and calling it right away // even for a test just makes things worse. sleepForRetries("Encountered a SocketTimeoutException. Since the " + "call to the remote cluster timed out, which is usually " + "caused by a machine failure or a massive slowdown", this.socketTimeoutMultiplier); } else if (ioe instanceof ConnectException) { LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe); replicationSinkMgr.chooseSinks(); } else { LOG.warn("Can't replicate because of a local or network error: ", ioe); } } if (sinkPeer != null) { replicationSinkMgr.reportBadSink(sinkPeer); } if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { sleepMultiplier++; } } } return false; // in case we exited before replicating }
private void replicateWALEntry(WAL.Entry[] entries) throws IOException { ReplicationProtbufUtil.replicateWALEntry(sepConsumer, entries); }