@Override public HLogKey getCurrentKey() throws IOException, InterruptedException { if (!(currentEntry.getKey() instanceof HLogKey)) { final IllegalStateException exception = new IllegalStateException( "HLogInputFormat only works when given entries that have HLogKey for keys. This" + " one had '" + currentEntry.getKey().getClass() + "'"); LOG.error("The deprecated HLogInputFormat has to work with the deprecated HLogKey class, " + " but HBase internals read the wal entry using some other class." + " This is a bug; please file an issue or email the developer mailing list. It is " + "likely that you would not have this problem if you updated to use WALInputFormat. " + "You will need the following exception details when seeking help from the HBase " + "community.", exception); throw exception; } return (HLogKey)currentEntry.getKey(); }
/** * Append a faked WALEdit in order to get a long sequence number and wal syncer will just ignore * the WALEdit append later. * * @param wal * @return Return the key used appending with no sync and no append. * @throws IOException */ private WALKey appendEmptyEdit(final WAL wal) throws IOException { // we use HLogKey here instead of WALKey directly to support legacy // coprocessors. @SuppressWarnings("deprecation") WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC()); // Call append but with an empty WALEdit. The returned sequence id will not // be associated // with any edit and we can be sure it went in after all outstanding // appends. try { wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, false); } catch (Throwable t) { // If exception, our mvcc won't get cleaned up by client, so do it here. getMVCC().complete(key.getWriteEntry()); } return key; }
/** * @param info * @param logKey * @param logEdit * @return true if default behavior should be bypassed, false otherwise * @throws IOException */ public boolean preWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit) throws IOException { return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { @Override public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException { // Once we don't need to support the legacy call, replace RegionOperation with a version // that's ObserverContext<RegionEnvironment> and avoid this cast. final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment(); if (env.useLegacyPre) { if (logKey instanceof HLogKey) { oserver.preWALRestore(ctx, info, (HLogKey)logKey, logEdit); } else { legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey."); } } else { oserver.preWALRestore(ctx, info, logKey, logEdit); } } }); }
/** * @param info * @param logKey * @param logEdit * @throws IOException */ public void postWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit) throws IOException { execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { @Override public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException { // Once we don't need to support the legacy call, replace RegionOperation with a version // that's ObserverContext<RegionEnvironment> and avoid this cast. final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment(); if (env.useLegacyPost) { if (logKey instanceof HLogKey) { oserver.postWALRestore(ctx, info, (HLogKey)logKey, logEdit); } else { legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey."); } } else { oserver.postWALRestore(ctx, info, logKey, logEdit); } } }); }
@Override public void map(HLogKey key, WALEdit value, Context context) throws IOException { try { // skip all other tables if (Bytes.equals(table, key.getTablename())) { for (KeyValue kv : value.getKeyValues()) { if (HLog.isMetaFamily(kv.getFamily())) continue; context.write(new ImmutableBytesWritable(kv.getRow()), kv); } } } catch (InterruptedException e) { e.printStackTrace(); } }
/** * @param info * @param logKey * @param logEdit * @return true if default behavior should be bypassed, false otherwise * @throws IOException */ public boolean preWALRestore(HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { boolean bypass = false; ObserverContext<RegionCoprocessorEnvironment> ctx = null; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { ctx = ObserverContext.createAndPrepare(env, ctx); try { ((RegionObserver)env.getInstance()).preWALRestore(ctx, info, logKey, logEdit); } catch (Throwable e) { handleCoprocessorThrowable(env, e); } bypass |= ctx.shouldBypass(); if (ctx.shouldComplete()) { break; } } } return bypass; }
/** * @param info * @param logKey * @param logEdit * @throws IOException */ public void postWALRestore(HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { ObserverContext<RegionCoprocessorEnvironment> ctx = null; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { ctx = ObserverContext.createAndPrepare(env, ctx); try { ((RegionObserver)env.getInstance()).postWALRestore(ctx, info, logKey, logEdit); } catch (Throwable e) { handleCoprocessorThrowable(env, e); } if (ctx.shouldComplete()) { break; } } } }
private void verifyRecoverEdits(final Path tableDir, final byte[] tableName, final Map<byte[], byte[]> regionsMap) throws IOException { for (FileStatus regionStatus: FSUtils.listStatus(fs, tableDir)) { assertTrue(regionStatus.getPath().getName().startsWith(Bytes.toString(tableName))); Path regionEdits = HLog.getRegionDirRecoveredEditsDir(regionStatus.getPath()); byte[] regionName = Bytes.toBytes(regionStatus.getPath().getName()); assertFalse(regionsMap.containsKey(regionName)); for (FileStatus logStatus: FSUtils.listStatus(fs, regionEdits)) { HLog.Reader reader = HLog.getReader(fs, logStatus.getPath(), conf); try { HLog.Entry entry; while ((entry = reader.next()) != null) { HLogKey key = entry.getKey(); assertArrayEquals(tableName, key.getTablename()); assertArrayEquals(regionName, key.getEncodedRegionName()); } } finally { reader.close(); } } } }
private void writeTestLog(final Path logFile) throws IOException { fs.mkdirs(logFile.getParent()); HLog.Writer writer = HLog.createWriter(fs, logFile, conf); try { for (int i = 0; i < 7; ++i) { byte[] tableName = getTableName(i); for (int j = 0; j < 10; ++j) { byte[] regionName = getRegionName(tableName, j); for (int k = 0; k < 50; ++k) { byte[] rowkey = Bytes.toBytes("row-" + k); HLogKey key = new HLogKey(regionName, tableName, (long)k, System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowkey, TEST_FAMILY, TEST_QUALIFIER, rowkey)); writer.append(new HLog.Entry(key, edit)); } } } } finally { writer.close(); } }
/** * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys * from compaction WAL edits and if the scope is local. * @param htd Descriptor used to find the scope to use * @param logKey Key that may get scoped according to its edits * @param logEdit Edits used to lookup the scopes */ public static void scopeWALEdits(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) { NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); byte[] family; for (KeyValue kv : logEdit.getKeyValues()) { family = kv.getFamily(); // This is expected and the KV should not be replicated if (kv.matchingFamily(WALEdit.METAFAMILY)) continue; // Unexpected, has a tendency to happen in unit tests assert htd.getFamily(family) != null; int scope = htd.getFamily(family).getScope(); if (scope != REPLICATION_SCOPE_LOCAL && !scopes.containsKey(family)) { scopes.put(family, scope); } } if (!scopes.isEmpty()) { logKey.setScopes(scopes); } }
@Override public void map(HLogKey key, WALEdit value, Context context) throws IOException { try { // skip all other tables if (Bytes.equals(table, key.getTablename().getName())) { for (KeyValue kv : value.getKeyValues()) { if (WALEdit.isMetaEditFamily(kv.getFamily())) continue; context.write(new ImmutableBytesWritable(kv.getRow()), kv); } } } catch (InterruptedException e) { e.printStackTrace(); } }
/** * @param info * @param logKey * @param logEdit * @return true if default behavior should be bypassed, false otherwise * @throws IOException */ public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit) throws IOException { boolean bypass = false; ObserverContext<RegionCoprocessorEnvironment> ctx = null; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { ctx = ObserverContext.createAndPrepare(env, ctx); Thread currentThread = Thread.currentThread(); ClassLoader cl = currentThread.getContextClassLoader(); try { currentThread.setContextClassLoader(env.getClassLoader()); ((RegionObserver)env.getInstance()).preWALRestore(ctx, info, logKey, logEdit); } catch (Throwable e) { handleCoprocessorThrowable(env, e); } finally { currentThread.setContextClassLoader(cl); } bypass |= ctx.shouldBypass(); if (ctx.shouldComplete()) { break; } } } return bypass; }
/** * @param info * @param logKey * @param logEdit * @throws IOException */ public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit) throws IOException { ObserverContext<RegionCoprocessorEnvironment> ctx = null; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { ctx = ObserverContext.createAndPrepare(env, ctx); Thread currentThread = Thread.currentThread(); ClassLoader cl = currentThread.getContextClassLoader(); try { currentThread.setContextClassLoader(env.getClassLoader()); ((RegionObserver)env.getInstance()).postWALRestore(ctx, info, logKey, logEdit); } catch (Throwable e) { handleCoprocessorThrowable(env, e); } finally { currentThread.setContextClassLoader(cl); } if (ctx.shouldComplete()) { break; } } } }
private void verifyRecoverEdits(final Path tableDir, final TableName tableName, final Map<byte[], byte[]> regionsMap) throws IOException { for (FileStatus regionStatus: FSUtils.listStatus(fs, tableDir)) { assertTrue(regionStatus.getPath().getName().startsWith(tableName.getNameAsString())); Path regionEdits = HLogUtil.getRegionDirRecoveredEditsDir(regionStatus.getPath()); byte[] regionName = Bytes.toBytes(regionStatus.getPath().getName()); assertFalse(regionsMap.containsKey(regionName)); for (FileStatus logStatus: FSUtils.listStatus(fs, regionEdits)) { HLog.Reader reader = HLogFactory.createReader(fs, logStatus.getPath(), conf); try { HLog.Entry entry; while ((entry = reader.next()) != null) { HLogKey key = entry.getKey(); assertEquals(tableName, key.getTablename()); assertArrayEquals(regionName, key.getEncodedRegionName()); } } finally { reader.close(); } } } }
private void writeTestLog(final Path logFile) throws IOException { fs.mkdirs(logFile.getParent()); HLog.Writer writer = HLogFactory.createWALWriter(fs, logFile, conf); try { for (int i = 0; i < 7; ++i) { TableName tableName = getTableName(i); for (int j = 0; j < 10; ++j) { byte[] regionName = getRegionName(tableName, j); for (int k = 0; k < 50; ++k) { byte[] rowkey = Bytes.toBytes("row-" + k); HLogKey key = new HLogKey(regionName, tableName, (long)k, System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowkey, TEST_FAMILY, TEST_QUALIFIER, rowkey)); writer.append(new HLog.Entry(key, edit)); } } } } finally { writer.close(); } }
@Override public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) { NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); byte[] family; for (KeyValue kv : logEdit.getKeyValues()) { family = kv.getFamily(); int scope = htd.getFamily(family).getScope(); if (scope != REPLICATION_SCOPE_LOCAL && !scopes.containsKey(family)) { scopes.put(family, scope); } } if (!scopes.isEmpty()) { logEdit.setScopes(scopes); } }
/** * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys * from compaction WAL edits and if the scope is local. * @param htd Descriptor used to find the scope to use * @param logKey Key that may get scoped according to its edits * @param logEdit Edits used to lookup the scopes */ public static void scopeWALEdits(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) { NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); byte[] family; for (KeyValue kv : logEdit.getKeyValues()) { family = kv.getFamily(); // This is expected and the KV should not be replicated if (CellUtil.matchingFamily(kv, WALEdit.METAFAMILY)) continue; // Unexpected, has a tendency to happen in unit tests assert htd.getFamily(family) != null; int scope = htd.getFamily(family).getScope(); if (scope != REPLICATION_SCOPE_LOCAL && !scopes.containsKey(family)) { scopes.put(family, scope); } } if (!scopes.isEmpty()) { logKey.setScopes(scopes); } }
@Override public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) { byte[] family; for (KeyValue kv : logEdit.getKeyValues()) { family = kv.getFamily(); int scope = htd.getFamily(family).getScope(); if (scope != REPLICATION_SCOPE_LOCAL && !logEdit.hasKeyInScope(family)) { logEdit.putIntoScope(family, scope); } } }
/** * Sanity check that we can move logs around while we are reading * from them. Should this test fail, ReplicationSource would have a hard * time reading logs that are being archived. * @throws Exception */ @Test public void testLogMoving() throws Exception{ Path logPath = new Path(logDir, "log"); if (!FS.exists(logDir)) FS.mkdirs(logDir); if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir); HLog.Writer writer = HLog.createWriter(FS, logPath, conf); for(int i = 0; i < 3; i++) { byte[] b = Bytes.toBytes(Integer.toString(i)); KeyValue kv = new KeyValue(b,b,b); WALEdit edit = new WALEdit(); edit.add(kv); HLogKey key = new HLogKey(b, b, 0, 0, HConstants.DEFAULT_CLUSTER_ID); writer.append(new HLog.Entry(key, edit)); writer.sync(); } writer.close(); HLog.Reader reader = HLog.getReader(FS, logPath, conf); HLog.Entry entry = reader.next(); assertNotNull(entry); Path oldLogPath = new Path(oldLogDir, "log"); FS.rename(logPath, oldLogPath); entry = reader.next(); assertNotNull(entry); entry = reader.next(); entry = reader.next(); assertNull(entry); }
private HLog.Entry createEntry(byte [] table, int row, KeyValue.Type type) { byte[] fam = Bytes.equals(table, TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2; byte[] rowBytes = Bytes.toBytes(row); // Just make sure we don't get the same ts for two consecutive rows with // same key try { Thread.sleep(1); } catch (InterruptedException e) { LOG.info("Was interrupted while sleep, meh", e); } final long now = System.currentTimeMillis(); KeyValue kv = null; if(type.getCode() == KeyValue.Type.Put.getCode()) { kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.Put, Bytes.toBytes(row)); } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) { kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.DeleteColumn); } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) { kv = new KeyValue(rowBytes, fam, null, now, KeyValue.Type.DeleteFamily); } HLogKey key = new HLogKey(table, table, now, now, HConstants.DEFAULT_CLUSTER_ID); WALEdit edit = new WALEdit(); edit.add(kv); return new HLog.Entry(key, edit); }
@Override public boolean preWALWrite(ObserverContext<WALCoprocessorEnvironment> env, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { boolean bypass = false; // check table name matches or not. if (!Arrays.equals(HRegionInfo.getTableName(info.getRegionName()), this.tableName)) { return bypass; } preWALWriteCalled = true; // here we're going to remove one keyvalue from the WALEdit, and add // another one to it. List<KeyValue> kvs = logEdit.getKeyValues(); KeyValue deletedKV = null; for (KeyValue kv : kvs) { // assume only one kv from the WALEdit matches. byte[] family = kv.getFamily(); byte[] qulifier = kv.getQualifier(); if (Arrays.equals(family, ignoredFamily) && Arrays.equals(qulifier, ignoredQualifier)) { LOG.debug("Found the KeyValue from WALEdit which should be ignored."); deletedKV = kv; } if (Arrays.equals(family, changedFamily) && Arrays.equals(qulifier, changedQualifier)) { LOG.debug("Found the KeyValue from WALEdit which should be changed."); kv.getBuffer()[kv.getValueOffset()] += 1; } } kvs.add(new KeyValue(row, addedFamily, addedQualifier)); if (deletedKV != null) { LOG.debug("About to delete a KeyValue from WALEdit."); kvs.remove(deletedKV); } return bypass; }
/** * Append a faked WALEdit in order to get a long sequence number and wal syncer will just ignore * the WALEdit append later. * * @param wal * @param cells list of Cells inserted into memstore. Those Cells are passed in order to * be updated with right mvcc values(their wal sequence number) * @return Return the key used appending with no sync and no append. * @throws IOException */ private WALKey appendEmptyEdit(final WAL wal, List<Cell> cells) throws IOException { // we use HLogKey here instead of WALKey directly to support legacy coprocessors. WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE); // Call append but with an empty WALEdit. The returned seqeunce id will not be associated // with any edit and we can be sure it went in after all outstanding appends. wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells); return key; }