/** * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys * from compaction WAL edits and if the scope is local. * @param htd Descriptor used to find the scope to use * @param logKey Key that may get scoped according to its edits * @param logEdit Edits used to lookup the scopes */ public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) { NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); byte[] family; for (Cell cell : logEdit.getCells()) { family = cell.getFamily(); // This is expected and the KV should not be replicated if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) continue; // Unexpected, has a tendency to happen in unit tests assert htd.getFamily(family) != null; int scope = htd.getFamily(family).getScope(); if (scope != REPLICATION_SCOPE_LOCAL && !scopes.containsKey(family)) { scopes.put(family, scope); } } if (!scopes.isEmpty()) { logKey.setScopes(scopes); } }
@Override public void map(WALKey key, WALEdit value, Context context) throws IOException { try { // skip all other tables if (Bytes.equals(table, key.getTablename().getName())) { for (Cell cell : value.getCells()) { KeyValue kv = KeyValueUtil.ensureKeyValueTypeForMR(cell); if (WALEdit.isMetaEditFamily(kv.getFamily())) continue; context.write(new ImmutableBytesWritable(kv.getRow()), kv); } } } catch (InterruptedException e) { e.printStackTrace(); } }
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION", justification="Will never be null") @Override public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key, final WALEdit edits, final boolean inMemstore) throws IOException { if (this.closed) throw new IOException("Cannot append; log is closed"); // Make a trace scope for the append. It is closed on other side of the ring buffer by the // single consuming thread. Don't have to worry about it. TraceScope scope = Trace.startSpan("FSHLog.append"); // This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need // all this to make a key and then below to append the edit, we need to carry htd, info, // etc. all over the ring buffer. FSWALEntry entry = null; long sequence = this.disruptor.getRingBuffer().next(); try { RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); // TODO: reuse FSWALEntry as we do SyncFuture rather create per append. entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore); truck.loadPayload(entry, scope.detach()); } finally { this.disruptor.getRingBuffer().publish(sequence); } return sequence; }
FSWALEntry(final long sequence, final WALKey key, final WALEdit edit, final HTableDescriptor htd, final HRegionInfo hri, final boolean inMemstore) { super(key, edit); this.inMemstore = inMemstore; this.htd = htd; this.hri = hri; this.sequence = sequence; if (inMemstore) { // construct familyNames here to reduce the work of log sinker. ArrayList<Cell> cells = this.getEdit().getCells(); if (CollectionUtils.isEmpty(cells)) { this.familyNames = Collections.<byte[]> emptySet(); } else { Set<byte[]> familySet = Sets.newTreeSet(Bytes.BYTES_COMPARATOR); for (Cell cell : cells) { if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { familySet.add(CellUtil.cloneFamily(cell)); } } this.familyNames = Collections.unmodifiableSet(familySet); } } else { this.familyNames = Collections.<byte[]> emptySet(); } }
/** * Here is where a WAL edit gets its sequenceid. * @return The sequenceid we stamped on this edit. * @throws IOException */ long stampRegionSequenceId() throws IOException { long regionSequenceId = WALKey.NO_SEQUENCE_ID; MultiVersionConcurrencyControl mvcc = getKey().getMvcc(); MultiVersionConcurrencyControl.WriteEntry we = null; if (mvcc != null) { we = mvcc.begin(); regionSequenceId = we.getWriteNumber(); } if (!this.getEdit().isReplay() && inMemstore) { for (Cell c:getEdit().getCells()) { CellUtil.setSequenceId(c, regionSequenceId); } } // This has to stay in this order WALKey key = getKey(); key.setLogSeqNum(regionSequenceId); key.setWriteEntry(we); return regionSequenceId; }
private static long writeMarker(final WAL wal, final HTableDescriptor htd, final HRegionInfo hri, final WALEdit edit, final MultiVersionConcurrencyControl mvcc, final boolean sync) throws IOException { // TODO: Pass in current time to use? WALKey key = new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), System.currentTimeMillis(), mvcc); // Add it to the log but the false specifies that we don't need to add it to the memstore long trx = MultiVersionConcurrencyControl.NONE; try { trx = wal.append(htd, hri, key, edit, false); if (sync) wal.sync(trx); } finally { // If you get hung here, is it a real WAL or a mocked WAL? If the latter, you need to // trip the latch that is inside in getWriteEntry up in your mock. See down in the append // called from onEvent in FSHLog. MultiVersionConcurrencyControl.WriteEntry we = key.getWriteEntry(); if (mvcc != null && we != null) mvcc.complete(we); } return trx; }
/** * Method to safely get the next sequence number. * * @return Next sequence number unassociated with any actual edit. * @throws IOException */ @VisibleForTesting protected long getNextSequenceId(final WAL wal) throws IOException { // TODO: For review. Putting an empty edit in to get a sequenceid out will // not work if the // WAL is banjaxed... if it has gotten an exception and the WAL has not yet // been rolled or // aborted. In this case, we'll just get stuck here. For now, until // HBASE-12751, just have // a timeout. May happen in tests after we tightened the semantic via // HBASE-14317. // Also, the getSequenceId blocks on a latch. There is no global list of // outstanding latches // so if an abort or stop, there is no way to call them in. WALKey key = this.appendEmptyEdit(wal); mvcc.complete(key.getWriteEntry()); return key.getSequenceId(this.maxWaitForSeqId); }
/** * Append a faked WALEdit in order to get a long sequence number and wal syncer will just ignore * the WALEdit append later. * * @param wal * @return Return the key used appending with no sync and no append. * @throws IOException */ private WALKey appendEmptyEdit(final WAL wal) throws IOException { // we use HLogKey here instead of WALKey directly to support legacy // coprocessors. @SuppressWarnings("deprecation") WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC()); // Call append but with an empty WALEdit. The returned sequence id will not // be associated // with any edit and we can be sure it went in after all outstanding // appends. try { wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, false); } catch (Throwable t) { // If exception, our mvcc won't get cleaned up by client, so do it here. getMVCC().complete(key.getWriteEntry()); } return key; }
/** * Constructor * @param impl the coprocessor instance * @param priority chaining priority */ public RegionEnvironment(final Coprocessor impl, final int priority, final int seq, final Configuration conf, final Region region, final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { super(impl, priority, seq, conf); this.region = region; this.rsServices = services; this.sharedData = sharedData; // Pick which version of the WAL related events we'll call. // This way we avoid calling the new version on older RegionObservers so // we can maintain binary compatibility. // See notes in javadoc for RegionObserver useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class, HRegionInfo.class, WALKey.class, WALEdit.class); useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class, HRegionInfo.class, WALKey.class, WALEdit.class); }
/** * @param info * @param logKey * @param logEdit * @return true if default behavior should be bypassed, false otherwise * @throws IOException */ public boolean preWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit) throws IOException { return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { @Override public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException { // Once we don't need to support the legacy call, replace RegionOperation with a version // that's ObserverContext<RegionEnvironment> and avoid this cast. final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment(); if (env.useLegacyPre) { if (logKey instanceof HLogKey) { oserver.preWALRestore(ctx, info, (HLogKey)logKey, logEdit); } else { legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey."); } } else { oserver.preWALRestore(ctx, info, logKey, logEdit); } } }); }
/** * @param info * @param logKey * @param logEdit * @throws IOException */ public void postWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit) throws IOException { execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { @Override public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException { // Once we don't need to support the legacy call, replace RegionOperation with a version // that's ObserverContext<RegionEnvironment> and avoid this cast. final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment(); if (env.useLegacyPost) { if (logKey instanceof HLogKey) { oserver.postWALRestore(ctx, info, (HLogKey)logKey, logEdit); } else { legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey."); } } else { oserver.postWALRestore(ctx, info, logKey, logEdit); } } }); }
@Test public void testSystemTableWALEntryFilter() { SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter(); // meta WALKey key1 = new WALKey( HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), TableName.META_TABLE_NAME); Entry metaEntry = new Entry(key1, null); assertNull(filter.filter(metaEntry)); // ns table WALKey key2 = new WALKey(new byte[] {}, TableName.NAMESPACE_TABLE_NAME); Entry nsEntry = new Entry(key2, null); assertNull(filter.filter(nsEntry)); // user table WALKey key3 = new WALKey(new byte[] {}, TableName.valueOf("foo")); Entry userEntry = new Entry(key3, null); assertEquals(userEntry, filter.filter(userEntry)); }
private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName, final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) throws IOException { String familyStr = Bytes.toString(family); long txid = -1; for (int j = 0; j < count; j++) { byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j)); byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j)); WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); // uses WALKey instead of HLogKey on purpose. will only work for tests where we don't care // about legacy coprocessors txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, ee.currentTime(), mvcc), edit, true); } if (-1 != txid) { wal.sync(txid); } }
/** * Utility method to setup a WAL mock. * Needs to do the bit where we close latch on the WALKey on append else test hangs. * @return * @throws IOException */ private WAL mockWAL() throws IOException { WAL wal = mock(WAL.class); Mockito.when(wal.append((HTableDescriptor)Mockito.any(), (HRegionInfo)Mockito.any(), (WALKey)Mockito.any(), (WALEdit)Mockito.any(), Mockito.anyBoolean())). thenAnswer(new Answer<Long>() { @Override public Long answer(InvocationOnMock invocation) throws Throwable { WALKey key = invocation.getArgumentAt(2, WALKey.class); MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(); key.setWriteEntry(we); return 1L; } }); return wal; }
private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName, final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) throws IOException { String familyStr = Bytes.toString(family); for (int j = 0; j < count; j++) { byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j)); byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j)); WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,999, mvcc), edit, true); } wal.sync(); }
protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times, MultiVersionConcurrencyControl mvcc) throws IOException { final byte[] row = Bytes.toBytes("row"); for (int i = 0; i < times; i++) { long timestamp = System.currentTimeMillis(); WALEdit cols = new WALEdit(); cols.add(new KeyValue(row, row, row, timestamp, row)); WALKey key = new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), WALKey.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc); log.append(htd, hri, key, cols, true); } log.sync(); }
@Test public void shouldBulkLoadSingleFamilyHLog() throws IOException { when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class), any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), any(boolean.class))).thenAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { WALKey walKey = invocation.getArgumentAt(2, WALKey.class); MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); if (mvcc != null) { MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); walKey.setWriteEntry(we); } return 01L; }; }); testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false, null); verify(log).sync(anyLong()); }
@Test public void shouldBulkLoadManyFamilyHLog() throws IOException { when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class), any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), any(boolean.class))).thenAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { WALKey walKey = invocation.getArgumentAt(2, WALKey.class); MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); if (mvcc != null) { MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); walKey.setWriteEntry(we); } return 01L; }; }); testRegionWithFamilies(family1, family2).bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null); verify(log).sync(anyLong()); }
@Test public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException { when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class), any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), any(boolean.class))).thenAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { WALKey walKey = invocation.getArgumentAt(2, WALKey.class); MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); if (mvcc != null) { MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); walKey.setWriteEntry(we); } return 01L; }; }); TableName tableName = TableName.valueOf("test", "test"); testRegionWithFamiliesAndSpecifiedTableName(tableName, family1, family2) .bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null); verify(log).sync(anyLong()); }
@Override public void map(WALKey key, WALEdit value, Context context) throws IOException { try { // skip all other tables if (Bytes.equals(table, key.getTablename().getName())) { for (Cell cell : value.getCells()) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); if (WALEdit.isMetaEditFamily(kv.getFamily())) continue; context.write(new ImmutableBytesWritable(kv.getRow()), kv); } } } catch (InterruptedException e) { e.printStackTrace(); } }
/** * Constructor * @param impl the coprocessor instance * @param priority chaining priority */ public RegionEnvironment(final Coprocessor impl, final int priority, final int seq, final Configuration conf, final HRegion region, final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { super(impl, priority, seq, conf); this.region = region; this.rsServices = services; this.sharedData = sharedData; // Pick which version of the WAL related events we'll call. // This way we avoid calling the new version on older RegionObservers so // we can maintain binary compatibility. // See notes in javadoc for RegionObserver useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class, HRegionInfo.class, WALKey.class, WALEdit.class); useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class, HRegionInfo.class, WALKey.class, WALEdit.class); }
private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName, final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, final HTableDescriptor htd, final AtomicLong sequenceId) throws IOException { String familyStr = Bytes.toString(family); long txid = -1; for (int j = 0; j < count; j++) { byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j)); byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j)); WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); // uses WALKey instead of HLogKey on purpose. will only work for tests where we don't care // about legacy coprocessors txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, ee.currentTime()), edit, sequenceId, true, null); } if (-1 != txid) { wal.sync(txid); } }
private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName, final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, final HTableDescriptor htd, final AtomicLong sequenceId) throws IOException { String familyStr = Bytes.toString(family); for (int j = 0; j < count; j++) { byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j)); byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j)); WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, ee.currentTime()), edit, sequenceId, true, null); } wal.sync(); }
/** * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from * compaction WAL edits and if the scope is local. * @param logKey Key that may get scoped according to its edits * @param logEdit Edits used to lookup the scopes * @throws IOException If failed to parse the WALEdit */ @VisibleForTesting static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) throws IOException { boolean replicationForBulkLoadEnabled = ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf); boolean foundOtherEdits = false; for (Cell cell : logEdit.getCells()) { if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { foundOtherEdits = true; break; } } if (!foundOtherEdits && logEdit.getCells().size() > 0) { WALProtos.RegionEventDescriptor maybeEvent = WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0)); if (maybeEvent != null && (maybeEvent.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) { // In serially replication, we use scopes when reading close marker. foundOtherEdits = true; } } if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) { ((WALKeyImpl) logKey).serializeReplicationScope(false); } }
@Override public void postAppend(final long size, final long time, final WALKey logkey, final WALEdit logEdit) throws IOException { source.incrementAppendCount(); source.incrementAppendTime(time); source.incrementAppendSize(size); source.incrementWrittenBytes(size); if (time > 1000) { source.incrementSlowAppendCount(); LOG.warn(String.format("%s took %d ms appending an edit to wal; len~=%s", Thread.currentThread().getName(), time, StringUtils.humanReadableInt(size))); } }
@Override public void map(WALKey key, WALEdit value, Context context) throws IOException { try { // skip all other tables if (Bytes.equals(table, key.getTableName().getName())) { for (Cell cell : value.getCells()) { if (WALEdit.isMetaEditFamily(cell)) { continue; } context.write(new ImmutableBytesWritable(CellUtil.cloneRow(cell)), new MapReduceExtendedCell(cell)); } } } catch (InterruptedException e) { e.printStackTrace(); } }
/** * Create a new reader from the split, and match the edits against the passed columns. */ private void testSplit(InputSplit split, byte[]... columns) throws Exception { WALRecordReader<WALKey> reader = getReader(); reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); for (byte[] column : columns) { assertTrue(reader.nextKeyValue()); Cell cell = reader.getCurrentValue().getCells().get(0); if (!Bytes.equals(column, 0, column.length, cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())) { assertTrue( "expected [" + Bytes.toString(column) + "], actual [" + Bytes.toString( cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]", false); } } assertFalse(reader.nextKeyValue()); reader.close(); }
/** * Constructor * @param implClass - not used * @param impl the coprocessor instance * @param priority chaining priority * @param seq load sequence * @param conf configuration * @param wal WAL */ public WALEnvironment(Class<?> implClass, final Coprocessor impl, final int priority, final int seq, final Configuration conf, final WAL wal) { super(impl, priority, seq, conf); this.wal = wal; // Pick which version of the API we'll call. // This way we avoid calling the new version on older WALObservers so // we can maintain binary compatibility. // See notes in javadoc for WALObserver useLegacyPre = useLegacyMethod(impl.getClass(), "preWALWrite", ObserverContext.class, HRegionInfo.class, WALKey.class, WALEdit.class); useLegacyPost = useLegacyMethod(impl.getClass(), "postWALWrite", ObserverContext.class, HRegionInfo.class, WALKey.class, WALEdit.class); }
/** * @param info * @param logKey * @param logEdit * @return true if default behavior should be bypassed, false otherwise * @throws IOException */ public boolean preWALWrite(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit) throws IOException { boolean bypass = false; if (this.coprocessors == null || this.coprocessors.isEmpty()) return bypass; ObserverContext<WALCoprocessorEnvironment> ctx = null; List<WALEnvironment> envs = coprocessors.get(); for (int i = 0; i < envs.size(); i++) { WALEnvironment env = envs.get(i); if (env.getInstance() instanceof WALObserver) { final WALObserver observer = (WALObserver)env.getInstance(); ctx = ObserverContext.createAndPrepare(env, ctx); Thread currentThread = Thread.currentThread(); ClassLoader cl = currentThread.getContextClassLoader(); try { currentThread.setContextClassLoader(env.getClassLoader()); if (env.useLegacyPre) { if (logKey instanceof HLogKey) { observer.preWALWrite(ctx, info, (HLogKey)logKey, logEdit); } else { legacyWarning(observer.getClass(), "There are wal keys present that are not HLogKey."); } } else { observer.preWALWrite(ctx, info, logKey, logEdit); } } catch (Throwable e) { handleCoprocessorThrowable(env, e); } finally { currentThread.setContextClassLoader(cl); } bypass |= ctx.shouldBypass(); if (ctx.shouldComplete()) { break; } } } return bypass; }
/** * @param info * @param logKey * @param logEdit * @throws IOException */ public void postWALWrite(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit) throws IOException { if (this.coprocessors == null || this.coprocessors.isEmpty()) return; ObserverContext<WALCoprocessorEnvironment> ctx = null; List<WALEnvironment> envs = coprocessors.get(); for (int i = 0; i < envs.size(); i++) { WALEnvironment env = envs.get(i); if (env.getInstance() instanceof WALObserver) { final WALObserver observer = (WALObserver)env.getInstance(); ctx = ObserverContext.createAndPrepare(env, ctx); Thread currentThread = Thread.currentThread(); ClassLoader cl = currentThread.getContextClassLoader(); try { currentThread.setContextClassLoader(env.getClassLoader()); if (env.useLegacyPost) { if (logKey instanceof HLogKey) { observer.postWALWrite(ctx, info, (HLogKey)logKey, logEdit); } else { legacyWarning(observer.getClass(), "There are wal keys present that are not HLogKey."); } } else { observer.postWALWrite(ctx, info, logKey, logEdit); } } catch (Throwable e) { handleCoprocessorThrowable(env, e); } finally { currentThread.setContextClassLoader(cl); } if (ctx.shouldComplete()) { break; } } } }
/** * Sanity check that we can move logs around while we are reading * from them. Should this test fail, ReplicationSource would have a hard * time reading logs that are being archived. * @throws Exception */ @Test public void testLogMoving() throws Exception{ Path logPath = new Path(logDir, "log"); if (!FS.exists(logDir)) FS.mkdirs(logDir); if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir); WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath, TEST_UTIL.getConfiguration()); for(int i = 0; i < 3; i++) { byte[] b = Bytes.toBytes(Integer.toString(i)); KeyValue kv = new KeyValue(b,b,b); WALEdit edit = new WALEdit(); edit.add(kv); WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID); writer.append(new WAL.Entry(key, edit)); writer.sync(); } writer.close(); WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration()); WAL.Entry entry = reader.next(); assertNotNull(entry); Path oldLogPath = new Path(oldLogDir, "log"); FS.rename(logPath, oldLogPath); entry = reader.next(); assertNotNull(entry); entry = reader.next(); entry = reader.next(); assertNull(entry); reader.close(); }
/** * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out * the compaction WALEdit * @throws Exception */ @Test(timeout=300000) public void testCompactionWALEdits() throws Exception { WALProtos.CompactionDescriptor compactionDescriptor = WALProtos.CompactionDescriptor.getDefaultInstance(); HRegionInfo hri = new HRegionInfo(htable1.getName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor); Replication.scopeWALEdits(htable1.getTableDescriptor(), new WALKey(), edit); }
private Entry createEntry(byte[]... kvs) { WALKey key1 = new WALKey(new byte[] {}, TableName.valueOf("foo")); WALEdit edit1 = new WALEdit(); for (byte[] kv : kvs) { edit1.add(new KeyValue(kv, kv, kv)); } return new Entry(key1, edit1); }
@Override public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx, HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { // only keep primary region's edits if (logKey.getTablename().equals(tableName) && info.getReplicaId() == 0) { entries.add(new Entry(logKey, logEdit)); } }
@Override public void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env, HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { String tableName = logKey.getTablename().getNameAsString(); if (tableName.equals(TABLE_SKIPPED)) { // skip recovery of TABLE_SKIPPED for testing purpose env.bypass(); return; } ctPreWALRestore.incrementAndGet(); }
@Override public boolean preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> env, HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { boolean bypass = false; // check table name matches or not. if (!Bytes.equals(info.getTableName(), this.tableName)) { return bypass; } preWALWriteCalled = true; // here we're going to remove one keyvalue from the WALEdit, and add // another one to it. List<Cell> cells = logEdit.getCells(); Cell deletedCell = null; for (Cell cell : cells) { // assume only one kv from the WALEdit matches. byte[] family = cell.getFamily(); byte[] qulifier = cell.getQualifier(); if (Arrays.equals(family, ignoredFamily) && Arrays.equals(qulifier, ignoredQualifier)) { LOG.debug("Found the KeyValue from WALEdit which should be ignored."); deletedCell = cell; } if (Arrays.equals(family, changedFamily) && Arrays.equals(qulifier, changedQualifier)) { LOG.debug("Found the KeyValue from WALEdit which should be changed."); cell.getValueArray()[cell.getValueOffset()] += 1; } } if (null != row) { cells.add(new KeyValue(row, addedFamily, addedQualifier)); } if (deletedCell != null) { LOG.debug("About to delete a KeyValue from WALEdit."); cells.remove(deletedCell); } return bypass; }