@Override public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId) throws IOException { if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo()) && replaySeqId < lastReplayedOpenRegionSeqId) { // if it is a secondary replica we should ignore these entries silently // since they are coming out of order if (LOG.isTraceEnabled()) { LOG.trace(getRegionInfo().getEncodedName() + " : " + "Skipping " + mutations.length + " mutations with replaySeqId=" + replaySeqId + " which is < than lastReplayedOpenRegionSeqId=" + lastReplayedOpenRegionSeqId); for (MutationReplay mut : mutations) { LOG.trace(getRegionInfo().getEncodedName() + " : Skipping : " + mut.mutation); } } OperationStatus[] statuses = new OperationStatus[mutations.length]; for (int i = 0; i < statuses.length; i++) { statuses[i] = OperationStatus.SUCCESS; } return statuses; } return batchMutate(new ReplayBatch(mutations, replaySeqId)); }
public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId) throws IOException { if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo()) && replaySeqId < lastReplayedOpenRegionSeqId) { // if it is a secondary replica we should ignore these entries silently // since they are coming out of order if (LOG.isTraceEnabled()) { LOG.trace(getRegionInfo().getEncodedName() + " : " + "Skipping " + mutations.length + " mutations with replaySeqId=" + replaySeqId + " which is < than lastReplayedOpenRegionSeqId=" + lastReplayedOpenRegionSeqId); for (MutationReplay mut : mutations) { LOG.trace(getRegionInfo().getEncodedName() + " : Skipping : " + mut.mutation); } } OperationStatus[] statuses = new OperationStatus[mutations.length]; for (int i = 0; i < statuses.length; i++) { statuses[i] = OperationStatus.SUCCESS; } return statuses; } return batchMutate(new ReplayBatchOperation(this, mutations, replaySeqId)); }
@Test public void testBatchReplayWithMultipleNonces() throws IOException { try { MutationReplay[] mutations = new MutationReplay[100]; for (int i = 0; i < 100; i++) { Put put = new Put(Bytes.toBytes(i)); put.setDurability(Durability.SYNC_WAL); for (byte[] familly : this.families) { put.addColumn(familly, this.cq, null); long nonceNum = i / 10; mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum); } } primaryRegion.batchReplay(mutations, 20); } catch (Exception e) { String msg = "Error while replay of batch with multiple nonces. "; LOG.error(msg, e); fail(msg + e.getMessage()); } }
static int replayEdit(HRegion region, WAL.Entry entry) throws IOException { if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) { return 0; // handled elsewhere } Put put = new Put(entry.getEdit().getCells().get(0).getRow()); for (Cell cell : entry.getEdit().getCells()) put.add(cell); put.setDurability(Durability.SKIP_WAL); MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); region.batchReplay(new MutationReplay[] {mutation}, entry.getKey().getLogSeqNum()); return Integer.parseInt(Bytes.toString(put.getRow())); }
static int replayEdit(HRegion region, WAL.Entry entry) throws IOException { if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) { return 0; // handled elsewhere } Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0))); for (Cell cell : entry.getEdit().getCells()) put.add(cell); put.setDurability(Durability.SKIP_WAL); MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); region.batchReplay(new MutationReplay[] {mutation}, entry.getKey().getSequenceId()); return Integer.parseInt(Bytes.toString(put.getRow())); }
public ReplayBatch(MutationReplay[] operations, long seqId) { super(operations); this.replaySeqId = seqId; }
private void replay(HRegion region, Put put, long replaySeqId) throws IOException { put.setDurability(Durability.SKIP_WAL); MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); region.batchReplay(new MutationReplay[] {mutation}, replaySeqId); }
public ReplayBatchOperation(final HRegion region, MutationReplay[] operations, long origLogSeqNum) { super(region, operations); this.origLogSeqNum = origLogSeqNum; }
/** * Replay a batch of mutations. * @param mutations mutations to replay. * @param replaySeqId * @return an array of OperationStatus which internally contains the * OperationStatusCode and the exceptionMessage if any. * @throws IOException */ OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId) throws IOException;
/** * Replay a batch of mutations. * * @param mutations mutations to replay. * @param replaySeqId SeqId for current mutations * @return an array of OperationStatus which internally contains the * OperationStatusCode and the exceptionMessage if any. * @throws IOException */ public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId) throws IOException { return batchMutate(new ReplayBatch(mutations, replaySeqId)); }