/** * Opens the file at the current position * @param path * @return an WAL reader. * @throws IOException */ public Reader openReader(Path path) throws IOException { // Detect if this is a new file, if so get a new reader else // reset the current reader so that we see the new data if (this.reader == null || !this.lastPath.equals(path)) { this.closeReader(); this.reader = WALFactory.createReader(this.fs, path, this.conf); this.lastPath = path; } else { try { this.reader.reset(); } catch (NullPointerException npe) { throw new IOException("NPE resetting reader, likely HDFS-4380", npe); } } return this.reader; }
/** * @param conf must not be null, will keep a reference to read params in later reader/writer * instances. * @param listeners may be null. will be given to all created wals (and not meta-wals) * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations * to make a directory */ public WALFactory(final Configuration conf, final List<WALActionsListener> listeners, final String factoryId) throws IOException { // until we've moved reader/writer construction down into providers, this initialization must // happen prior to provider initialization, in case they need to instantiate a reader/writer. timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); /* TODO Both of these are probably specific to the fs wal provider */ logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, DefaultWALProvider.Reader.class); this.conf = conf; this.factoryId = factoryId; // end required early initialization if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) { provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, listeners, null); } else { // special handling of existing configuration behavior. LOG.warn("Running with WAL disabled."); provider = new DisabledWALProvider(); provider.init(this, conf, null, factoryId); } }
@Override public void setUpCluster() throws Exception { util = getTestingUtil(null); Configuration conf = util.getConfiguration(); if (!util.isDistributedCluster()) { // Inject required configuration if we are not running in distributed mode conf.setInt(HFile.FORMAT_VERSION_KEY, 3); conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName()); conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class, Reader.class); conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class, Writer.class); conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true); } // Check if the cluster configuration can support this test try { EncryptionTest.testEncryption(conf, "AES", null); } catch (Exception e) { LOG.warn("Encryption configuration test did not pass, skipping test"); return; } super.setUpCluster(); initialized = true; }
private boolean logsAreEqual(Path p1, Path p2) throws IOException { Reader in1, in2; in1 = wals.createReader(fs, p1); in2 = wals.createReader(fs, p2); Entry entry1; Entry entry2; while ((entry1 = in1.next()) != null) { entry2 = in2.next(); if ((entry1.getKey().compareTo(entry2.getKey()) != 0) || (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) { return false; } } in1.close(); in2.close(); return true; }
/** * @param conf must not be null, will keep a reference to read params in later reader/writer * instances. * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations * to make a directory */ public WALFactory(Configuration conf, String factoryId) throws IOException { // until we've moved reader/writer construction down into providers, this initialization must // happen prior to provider initialization, in case they need to instantiate a reader/writer. timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); /* TODO Both of these are probably specific to the fs wal provider */ logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, AbstractFSWALProvider.Reader.class); this.conf = conf; this.factoryId = factoryId; // end required early initialization if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) { provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null); } else { // special handling of existing configuration behavior. LOG.warn("Running with WAL disabled."); provider = new DisabledWALProvider(); provider.init(this, conf, factoryId); } }
@Override public void setUpCluster() throws Exception { util = getTestingUtil(null); Configuration conf = util.getConfiguration(); if (!util.isDistributedCluster()) { // Inject required configuration if we are not running in distributed mode conf.setInt(HFile.FORMAT_VERSION_KEY, 3); conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName()); conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class, Reader.class); conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class, Writer.class); conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true); } // Check if the cluster configuration can support this test try { EncryptionTest.testEncryption(conf, "AES", null); } catch (Exception e) { LOG.warn("Encryption configuration test did not pass, skipping test", e); return; } super.setUpCluster(); initialized = true; }
private WALFactory(Configuration conf) { // this code is duplicated here so we can keep our members final. // until we've moved reader/writer construction down into providers, this initialization must // happen prior to provider initialization, in case they need to instantiate a reader/writer. timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); /* TODO Both of these are probably specific to the fs wal provider */ logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, DefaultWALProvider.Reader.class); this.conf = conf; // end required early initialization // this instance can't create wals, just reader/writers. provider = null; factoryId = SINGLETON_ID; }
static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) throws CorruptedLogFileException, IOException { try { return in.next(); } catch (EOFException eof) { // truncated files are expected if a RS crashes (see HBASE-2643) LOG.info("EOF from wal " + path + ". continuing"); return null; } catch (IOException e) { // If the IOE resulted from bad file format, // then this problem is idempotent and retrying won't help if (e.getCause() != null && (e.getCause() instanceof ParseException || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) { LOG.warn("Parse exception " + e.getCause().toString() + " from wal " + path + ". continuing"); return null; } if (!skipErrors) { throw e; } CorruptedLogFileException t = new CorruptedLogFileException("skipErrors=true Ignoring exception" + " while parsing wal " + path + ". Marking as corrupted"); t.initCause(e); throw t; } }
@BeforeClass public static void setUpBeforeClass() throws Exception { Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration(); conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName()); conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class, Reader.class); conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class, Writer.class); conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true); TestWALReplay.setUpBeforeClass(); }
private void ignoreCorruption(final Corruptions corruption, final int entryCount, final int expectedCount) throws IOException { conf.setBoolean(HBASE_SKIP_ERRORS, false); final String REGION = "region__1"; REGIONS.clear(); REGIONS.add(REGION); Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0"); generateWALs(1, entryCount, -1); corruptWAL(c1, corruption, true); useDifferentDFSClient(); WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); assertEquals(1, splitLog.length); int actualCount = 0; Reader in = wals.createReader(fs, splitLog[0]); @SuppressWarnings("unused") Entry entry; while ((entry = in.next()) != null) ++actualCount; assertEquals(expectedCount, actualCount); in.close(); // should not have stored the EOF files as corrupt FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR); assertEquals(archivedLogs.length, 0); }
private int countWAL(Path log) throws IOException { int count = 0; Reader in = wals.createReader(fs, log); while (in.next() != null) { count++; } in.close(); return count; }
private WALFactory(Configuration conf) { // this code is duplicated here so we can keep our members final. // until we've moved reader/writer construction down into providers, this initialization must // happen prior to provider initialization, in case they need to instantiate a reader/writer. timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); /* TODO Both of these are probably specific to the fs wal provider */ logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, AbstractFSWALProvider.Reader.class); this.conf = conf; // end required early initialization // this instance can't create wals, just reader/writers. provider = null; factoryId = SINGLETON_ID; }
static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) throws CorruptedLogFileException, IOException { try { return in.next(); } catch (EOFException eof) { // truncated files are expected if a RS crashes (see HBASE-2643) LOG.info("EOF from wal {}. Continuing.", path); return null; } catch (IOException e) { // If the IOE resulted from bad file format, // then this problem is idempotent and retrying won't help if (e.getCause() != null && (e.getCause() instanceof ParseException || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) { LOG.warn("Parse exception from wal {}. Continuing", path, e); return null; } if (!skipErrors) { throw e; } CorruptedLogFileException t = new CorruptedLogFileException("skipErrors=true Ignoring exception" + " while parsing wal " + path + ". Marking as corrupted"); t.initCause(e); throw t; } }
private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException { long dstMinLogSeqNum = -1L; try (WAL.Reader reader = walFactory.createReader(fs, dst)) { WAL.Entry entry = reader.next(); if (entry != null) { dstMinLogSeqNum = entry.getKey().getSequenceId(); } } catch (EOFException e) { LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst, e); } if (wap.minLogSeqNum < dstMinLogSeqNum) { LOG.warn("Found existing old edits file. It could be the result of a previous failed" + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" + fs.getFileStatus(dst).getLen()); if (!fs.delete(dst, false)) { LOG.warn("Failed deleting of old {}", dst); throw new IOException("Failed deleting of old " + dst); } } else { LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p + ", length=" + fs.getFileStatus(wap.p).getLen()); if (!fs.delete(wap.p, false)) { LOG.warn("Failed deleting of {}", wap.p); throw new IOException("Failed deleting of " + wap.p); } } }
private void ignoreCorruption(final Corruptions corruption, final int entryCount, final int expectedCount) throws IOException { conf.setBoolean(HBASE_SKIP_ERRORS, false); final String REGION = "region__1"; REGIONS.clear(); REGIONS.add(REGION); Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0"); generateWALs(1, entryCount, -1, 0); corruptWAL(c1, corruption, true); useDifferentDFSClient(); WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); assertEquals(1, splitLog.length); int actualCount = 0; Reader in = wals.createReader(fs, splitLog[0]); @SuppressWarnings("unused") Entry entry; while ((entry = in.next()) != null) ++actualCount; assertEquals(expectedCount, actualCount); in.close(); // should not have stored the EOF files as corrupt FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR); assertEquals(0, archivedLogs.length); }
@BeforeClass public static void setUpBeforeClass() throws Exception { Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName()); conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class, Reader.class); conf.setClass("hbase.regionserver.hlog.async.writer.impl", SecureAsyncProtobufLogWriter.class, AsyncWriter.class); conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true); TestAsyncWALReplay.setUpBeforeClass(); }
@BeforeClass public static void setUpBeforeClass() throws Exception { Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName()); conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class, Reader.class); conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class, Writer.class); conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true); AbstractTestWALReplay.setUpBeforeClass(); }
public Reader createReader(final FileSystem fs, final Path path) throws IOException { return createReader(fs, path, (CancelableProgressable)null); }
/** * Create a reader for the WAL. If you are reading from a file that's being written to and need * to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method * then just seek back to the last known good position. * @return A WAL reader. Close when done with it. * @throws IOException */ public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter) throws IOException { return createReader(fs, path, reporter, true); }
/** * Create a reader for the given path, accept custom reader classes from conf. * If you already have a WALFactory, you should favor the instance method. * @return a WAL Reader, caller must close. */ public static Reader createReader(final FileSystem fs, final Path path, final Configuration configuration) throws IOException { return getInstance(configuration).createReader(fs, path); }
/** * Create a reader for the given path, accept custom reader classes from conf. * If you already have a WALFactory, you should favor the instance method. * @return a WAL Reader, caller must close. */ static Reader createReader(final FileSystem fs, final Path path, final Configuration configuration, final CancelableProgressable reporter) throws IOException { return getInstance(configuration).createReader(fs, path, reporter); }
/** * Create a reader for the given path, ignore custom reader classes from conf. * If you already have a WALFactory, you should favor the instance method. * only public pending move of {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} * @return a WAL Reader, caller must close. */ public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path, final Configuration configuration) throws IOException { return getInstance(configuration).createReader(fs, path, null, false); }
/** * Create a new {@link Reader} for reading logs to split. * @return new Reader instance, caller should close */ protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { return walFactory.createReader(fs, curLogFile, reporter); }