@Override public Entry filter(Entry entry) { NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes(); if (scopes == null || scopes.isEmpty()) { return null; } ArrayList<Cell> cells = entry.getEdit().getCells(); int size = cells.size(); for (int i = size - 1; i >= 0; i--) { Cell cell = cells.get(i); // The scope will be null or empty if // there's nothing to replicate in that WALEdit if (!scopes.containsKey(cell.getFamily()) || scopes.get(cell.getFamily()) == HConstants.REPLICATION_SCOPE_LOCAL) { cells.remove(i); } } if (cells.size() < size / 2) { cells.trimToSize(); } return entry; }
@Override public Integer call() throws IOException { SinkPeer sinkPeer = null; try { sinkPeer = replicationSinkMgr.getReplicationSink(); BlockingInterface rrs = sinkPeer.getRegionServer(); ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()])); replicationSinkMgr.reportSinkSuccess(sinkPeer); return ordinal; } catch (IOException ioe) { if (sinkPeer != null) { replicationSinkMgr.reportBadSink(sinkPeer); } throw ioe; } }
@Override public void append(RegionEntryBuffer buffer) throws IOException { List<Entry> entries = buffer.getEntryBuffer(); if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) { return; } // meta edits (e.g. flush) are always replicated. // data edits (e.g. put) are replicated if the table requires them. if (!requiresReplication(buffer.getTableName(), entries)) { return; } sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(), entries.get(0).getEdit().getCells().get(0).getRow(), entries); }
/** * Append a log entry into the corresponding region buffer. * Blocks if the total heap usage has crossed the specified threshold. * * @throws InterruptedException * @throws IOException */ public void appendEntry(Entry entry) throws InterruptedException, IOException { WALKey key = entry.getKey(); RegionEntryBuffer buffer; long incrHeap; synchronized (this) { buffer = buffers.get(key.getEncodedRegionName()); if (buffer == null) { buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName()); buffers.put(key.getEncodedRegionName(), buffer); } incrHeap= buffer.appendEntry(entry); } // If we crossed the chunk threshold, wait for more space to be available synchronized (controller.dataAvailable) { totalBuffered += incrHeap; while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) { LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads..."); controller.dataAvailable.wait(2000); } controller.dataAvailable.notifyAll(); } controller.checkForErrors(); }
/** * @return RegionEntryBuffer a buffer of edits to be written or replayed. */ synchronized RegionEntryBuffer getChunkToWrite() { long biggestSize = 0; byte[] biggestBufferKey = null; for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) { long size = entry.getValue().heapSize(); if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) { biggestSize = size; biggestBufferKey = entry.getKey(); } } if (biggestBufferKey == null) { return null; } RegionEntryBuffer buffer = buffers.remove(biggestBufferKey); currentlyWriting.add(biggestBufferKey); return buffer; }
/** * Get a writer and path for a log starting at the given entry. This function is threadsafe so * long as multiple threads are always acting on different regions. * @return null if this region shouldn't output any logs */ private WriterAndPath getWriterAndPath(Entry entry) throws IOException { byte region[] = entry.getKey().getEncodedRegionName(); WriterAndPath ret = (WriterAndPath) writers.get(region); if (ret != null) { return ret; } // If we already decided that this region doesn't get any output // we don't need to check again. if (blacklistedRegions.contains(region)) { return null; } ret = createWAP(region, entry, rootDir); if (ret == null) { blacklistedRegions.add(region); return null; } writers.put(region, ret); return ret; }
/** * @return a path with a write for that path. caller should close. */ private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException { Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true); if (regionedits == null) { return null; } if (fs.exists(regionedits)) { LOG.warn("Found old edits file. It could be the " + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" + fs.getFileStatus(regionedits).getLen()); if (!fs.delete(regionedits, false)) { LOG.warn("Failed delete of old " + regionedits); } } Writer w = createWriter(regionedits); LOG.debug("Creating writer path=" + regionedits); return new WriterAndPath(regionedits, w); }
private void processWorkItems(String key, List<Pair<HRegionLocation, Entry>> actions) throws IOException { RegionServerWriter rsw = null; long startTime = System.nanoTime(); try { rsw = getRegionServerWriter(key); rsw.sink.replayEntries(actions); // Pass along summary statistics rsw.incrementEdits(actions.size()); rsw.incrementNanoTime(System.nanoTime() - startTime); } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); LOG.fatal(" Got while writing log entry to log", e); throw e; } }
private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo, final List<Entry> entries) throws IOException { try { RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null); ReplayServerCallable<ReplicateWALEntryResponse> callable = new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc, regionInfo, entries); factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout); } catch (IOException ie) { if (skipErrors) { LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + "=true so continuing replayEdits with error:" + ie.getMessage()); } else { throw ie; } } }
private void replayToServer(HRegionInfo regionInfo, List<Entry> entries) throws IOException, ServiceException { if (entries.isEmpty()) return; Entry[] entriesArray = new Entry[entries.size()]; entriesArray = entries.toArray(entriesArray); AdminService.BlockingInterface remoteSvr = conn.getAdmin(getLocation().getServerName()); Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray); PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond()); try { remoteSvr.replay(controller, p.getFirst()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } }
@Override public void prepare(boolean reload) throws IOException { if (!reload) return; // relocate regions in case we have a new dead server or network hiccup // if not due to connection issue, the following code should run fast because it uses // cached location boolean skip = false; for (Entry entry : this.entries) { WALEdit edit = entry.getEdit(); List<Cell> cells = edit.getCells(); for (Cell cell : cells) { // filtering WAL meta entries setLocation(conn.locateRegion(tableName, cell.getRow())); skip = true; break; } // use first log entry to relocate region because all entries are for one region if (skip) break; } }
@Override public WALEntryFilter getWALEntryfilter() { return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() { @Override public Entry filter(Entry entry) { ArrayList<Cell> cells = entry.getEdit().getCells(); int size = cells.size(); for (int i = size-1; i >= 0; i--) { Cell cell = cells.get(i); if (!Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), row, 0, row.length)) { cells.remove(i); } } return entry; } }); }
@Test public void testSystemTableWALEntryFilter() { SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter(); // meta WALKey key1 = new WALKey( HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), TableName.META_TABLE_NAME); Entry metaEntry = new Entry(key1, null); assertNull(filter.filter(metaEntry)); // ns table WALKey key2 = new WALKey(new byte[] {}, TableName.NAMESPACE_TABLE_NAME); Entry nsEntry = new Entry(key2, null); assertNull(filter.filter(nsEntry)); // user table WALKey key3 = new WALKey(new byte[] {}, TableName.valueOf("foo")); Entry userEntry = new Entry(key3, null); assertEquals(userEntry, filter.filter(userEntry)); }
private void assertEquals(Entry e1, Entry e2) { Assert.assertEquals(e1 == null, e2 == null); if (e1 == null) { return; } // do not compare WALKeys // compare kvs Assert.assertEquals(e1.getEdit() == null, e2.getEdit() == null); if (e1.getEdit() == null) { return; } List<Cell> cells1 = e1.getEdit().getCells(); List<Cell> cells2 = e2.getEdit().getCells(); Assert.assertEquals(cells1.size(), cells2.size()); for (int i = 0; i < cells1.size(); i++) { KeyValue.COMPARATOR.compare(cells1.get(i), cells2.get(i)); } }
private void replicateUsingCallable(ClusterConnection connection, Queue<Entry> entries) throws IOException, RuntimeException { Entry entry; while ((entry = entries.poll()) != null) { byte[] row = entry.getEdit().getCells().get(0).getRow(); RegionLocations locations = connection.locateRegion(tableName, row, true, true); RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection, RpcControllerFactory.instantiate(connection.getConfiguration()), table.getName(), locations.getRegionLocation(1), locations.getRegionLocation(1).getRegionInfo(), row, Lists.newArrayList(entry), new AtomicLong()); RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate( connection.getConfiguration()); factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000); } }
/** * Append a log entry into the corresponding region buffer. * Blocks if the total heap usage has crossed the specified threshold. * * @throws InterruptedException * @throws IOException */ void appendEntry(Entry entry) throws InterruptedException, IOException { WALKey key = entry.getKey(); RegionEntryBuffer buffer; long incrHeap; synchronized (this) { buffer = buffers.get(key.getEncodedRegionName()); if (buffer == null) { buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName()); buffers.put(key.getEncodedRegionName(), buffer); } incrHeap= buffer.appendEntry(entry); } // If we crossed the chunk threshold, wait for more space to be available synchronized (dataAvailable) { totalBuffered += incrHeap; while (totalBuffered > maxHeapUsage && thrown.get() == null) { LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads..."); dataAvailable.wait(2000); } dataAvailable.notifyAll(); } checkForErrors(); }
/** * @return a path with a write for that path. caller should close. */ private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException { Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true); if (regionedits == null) { return null; } if (fs.exists(regionedits)) { LOG.warn("Found old edits file. It could be the " + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" + fs.getFileStatus(regionedits).getLen()); if (!fs.delete(regionedits, false)) { LOG.warn("Failed delete of old " + regionedits); } } Writer w = createWriter(regionedits); LOG.info("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region)); return (new WriterAndPath(regionedits, w)); }
/** * Takes a region:sequenceId map for a WAL file, and checks whether the file can be archived. * It compares the region entries present in the passed sequenceNums map with the local copy of * {@link #oldestUnflushedRegionSequenceIds} and {@link #lowestFlushingRegionSequenceIds}. If, * for all regions, the value is lesser than the minimum of values present in the * oldestFlushing/UnflushedSeqNums, then the wal file is eligible for archiving. * @param sequenceNums for a WAL, at the time when it was rolled. * @param oldestFlushingMap * @param oldestUnflushedMap * @return true if wal is eligible for archiving, false otherwise. */ static boolean areAllRegionsFlushed(Map<byte[], Long> sequenceNums, Map<byte[], Long> oldestFlushingMap, Map<byte[], Long> oldestUnflushedMap) { for (Map.Entry<byte[], Long> regionSeqIdEntry : sequenceNums.entrySet()) { // find region entries in the flushing/unflushed map. If there is no entry, it meansj // a region doesn't have any unflushed entry. long oldestFlushing = oldestFlushingMap.containsKey(regionSeqIdEntry.getKey()) ? oldestFlushingMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE; long oldestUnFlushed = oldestUnflushedMap.containsKey(regionSeqIdEntry.getKey()) ? oldestUnflushedMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE; // do a minimum to be sure to contain oldest sequence Id long minSeqNum = Math.min(oldestFlushing, oldestUnFlushed); if (minSeqNum <= regionSeqIdEntry.getValue()) return false;// can't archive } return true; }
/** * Iterates over the given map of regions, and compares their sequence numbers with corresponding * entries in {@link #oldestUnflushedRegionSequenceIds}. If the sequence number is greater or * equal, the region is eligible to flush, otherwise, there is no benefit to flush (from the * perspective of passed regionsSequenceNums map), because the region has already flushed the * entries present in the WAL file for which this method is called for (typically, the oldest * wal file). * @param regionsSequenceNums * @return regions which should be flushed (whose sequence numbers are larger than their * corresponding un-flushed entries. */ private byte[][] findEligibleMemstoresToFlush(Map<byte[], Long> regionsSequenceNums) { List<byte[]> regionsToFlush = null; // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock. synchronized (regionSequenceIdLock) { for (Map.Entry<byte[], Long> e : regionsSequenceNums.entrySet()) { Long unFlushedVal = this.oldestUnflushedRegionSequenceIds.get(e.getKey()); if (unFlushedVal != null && unFlushedVal <= e.getValue()) { if (regionsToFlush == null) regionsToFlush = new ArrayList<byte[]>(); regionsToFlush.add(e.getKey()); } } } return regionsToFlush == null ? null : regionsToFlush .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY }); }
/** * If the number of un-archived WAL files is greater than maximum allowed, it checks * the first (oldest) WAL file, and returns the regions which should be flushed so that it could * be archived. * @return regions to flush in order to archive oldest wal file. * @throws IOException */ byte[][] findRegionsToForceFlush() throws IOException { byte [][] regions = null; int logCount = getNumRolledLogFiles(); if (logCount > this.maxLogs && logCount > 0) { Map.Entry<Path, Map<byte[], Long>> firstWALEntry = this.byWalRegionSequenceIds.firstEntry(); regions = findEligibleMemstoresToFlush(firstWALEntry.getValue()); } if (regions != null) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < regions.length; i++) { if (i > 0) sb.append(", "); sb.append(Bytes.toStringBinary(regions[i])); } LOG.info("Too many wals: logs=" + logCount + ", maxlogs=" + this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " + sb.toString()); } return regions; }
/** * @throws IOException * @see https://issues.apache.org/jira/browse/HBASE-3020 */ @Test (timeout=300000) public void testRecoveredEditsPathForMeta() throws IOException { byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME); Path regiondir = new Path(tdir, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); fs.mkdirs(regiondir); long now = System.currentTimeMillis(); Entry entry = new Entry(new WALKey(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), new WALEdit()); Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, true); String parentOfParent = p.getParent().getParent().getName(); assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); }
/** * Test old recovered edits file doesn't break WALSplitter. * This is useful in upgrading old instances. */ @Test (timeout=300000) public void testOldRecoveredEditsFileSidelined() throws IOException { byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME); Path regiondir = new Path(tdir, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); fs.mkdirs(regiondir); long now = System.currentTimeMillis(); Entry entry = new Entry(new WALKey(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), new WALEdit()); Path parent = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); assertEquals(parent.getName(), HConstants.RECOVERED_EDITS_DIR); fs.createNewFile(parent); // create a recovered.edits file Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, true); String parentOfParent = p.getParent().getParent().getName(); assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); WALFactory.createRecoveredEditsWriter(fs, p, conf).close(); }
private boolean logsAreEqual(Path p1, Path p2) throws IOException { Reader in1, in2; in1 = wals.createReader(fs, p1); in2 = wals.createReader(fs, p2); Entry entry1; Entry entry2; while ((entry1 = in1.next()) != null) { entry2 = in2.next(); if ((entry1.getKey().compareTo(entry2.getKey()) != 0) || (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) { return false; } } in1.close(); in2.close(); return true; }
@Override public Entry filter(Entry entry) { // don't replicate if the log entries have already been consumed by the cluster if (replicationEndpoint.canReplicateToSameCluster() || !entry.getKey().getClusterIds().contains(peerClusterId)) { WALEdit edit = entry.getEdit(); WALKeyImpl logKey = (WALKeyImpl)entry.getKey(); if (edit != null && !edit.isEmpty()) { // Mark that the current cluster has the change logKey.addClusterId(clusterId); // We need to set the CC to null else it will be compressed when sent to the sink entry.setCompressionContext(null); return entry; } } return null; }
private void filterCells(Entry entry) { if (entry == null || cellFilters.length == 0) { return; } ArrayList<Cell> cells = entry.getEdit().getCells(); int size = cells.size(); for (int i = size - 1; i >= 0; i--) { Cell cell = cells.get(i); for (WALCellFilter filter : cellFilters) { cell = filter.filterCell(entry, cell); if (cell != null) { cells.set(i, cell); } else { cells.remove(i); break; } } } if (cells.size() < size / 2) { cells.trimToSize(); } }
/** * Returns whether the file is opened for writing. */ private boolean readNextEntryAndSetPosition() throws IOException { Entry readEntry = reader.next(); long readerPos = reader.getPosition(); OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted // data, so we need to make sure that we do not read beyond the committed file length. if (LOG.isDebugEnabled()) { LOG.debug("The provider tells us the valid length for " + currentPath + " is " + fileLength.getAsLong() + ", but we have advanced to " + readerPos); } resetReader(); return true; } if (readEntry != null) { metrics.incrLogEditsRead(); metrics.incrLogReadInBytes(readerPos - currentPosition); } currentEntry = readEntry; // could be null setPosition(readerPos); return fileLength.isPresent(); }
@Override public String getStats() { StringBuilder sb = new StringBuilder(); sb.append("Total replicated edits: ").append(totalReplicatedEdits) .append(", current progress: \n"); for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) { String walGroupId = entry.getKey(); ReplicationSourceShipper worker = entry.getValue(); long position = worker.getCurrentPosition(); Path currentPath = worker.getCurrentPath(); sb.append("walGroup [").append(walGroupId).append("]: "); if (currentPath != null) { sb.append("currently replicating from: ").append(currentPath).append(" at position: ") .append(position).append("\n"); } else { sb.append("no replication ongoing, waiting for new log"); } } return sb.toString(); }
/** * @return true if we should stop reading because we're at REGION_CLOSE */ private boolean updateSerialReplPos(WALEntryBatch batch, Entry entry) throws IOException { if (entry.hasSerialReplicationScope()) { String key = Bytes.toString(entry.getKey().getEncodedRegionName()); batch.setLastPosition(key, entry.getKey().getSequenceId()); if (!entry.getEdit().getCells().isEmpty()) { WALProtos.RegionEventDescriptor maybeEvent = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); if (maybeEvent != null && maybeEvent .getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) { // In serially replication, if we move a region to another RS and move it back, we may // read logs crossing two sections. We should break at REGION_CLOSE and push the first // section first in case of missing the middle section belonging to the other RS. // In a worker thread, if we can push the first log of a region, we can push all logs // in the same region without waiting until we read a close marker because next time // we read logs in this region, it must be a new section and not adjacent with this // region. Mark it negative. batch.setLastPosition(key, -entry.getKey().getSequenceId()); return true; } } } return false; }
@Override public void append(RegionEntryBuffer buffer) throws IOException { List<Entry> entries = buffer.getEntryBuffer(); if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) { return; } // meta edits (e.g. flush) are always replicated. // data edits (e.g. put) are replicated if the table requires them. if (!requiresReplication(buffer.getTableName(), entries)) { return; } sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(), CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0)), entries); }
/** * Append a log entry into the corresponding region buffer. * Blocks if the total heap usage has crossed the specified threshold. * * @throws InterruptedException * @throws IOException */ public void appendEntry(Entry entry) throws InterruptedException, IOException { WALKey key = entry.getKey(); RegionEntryBuffer buffer; long incrHeap; synchronized (this) { buffer = buffers.get(key.getEncodedRegionName()); if (buffer == null) { buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName()); buffers.put(key.getEncodedRegionName(), buffer); } incrHeap= buffer.appendEntry(entry); } // If we crossed the chunk threshold, wait for more space to be available synchronized (controller.dataAvailable) { totalBuffered += incrHeap; while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) { LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered); controller.dataAvailable.wait(2000); } controller.dataAvailable.notifyAll(); } controller.checkForErrors(); }