/** * Parse a single hlog and put the edits in entryBuffers * * @param in the hlog reader * @param path the path of the log file * @param entryBuffers the buffer to hold the parsed edits * @param fs the file system * @param conf the configuration * @param skipErrors indicator if CorruptedLogFileException should be thrown instead of IOException * @throws IOException * @throws CorruptedLogFileException if hlog is corrupted */ private void parseHLog(final Reader in, Path path, EntryBuffers entryBuffers, final FileSystem fs, final Configuration conf, boolean skipErrors) throws IOException, CorruptedLogFileException { int editsCount = 0; try { Entry entry; while ((entry = getNextLogLine(in, path, skipErrors)) != null) { entryBuffers.appendEntry(entry); editsCount++; } } catch (InterruptedException ie) { IOException t = new InterruptedIOException(); t.initCause(ie); throw t; } finally { LOG.debug("Pushed=" + editsCount + " entries from " + path); } }
@Test(expected = IOException.class) public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException { conf.setBoolean(HBASE_SKIP_ERRORS, false); Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl", Reader.class); InstrumentedSequenceFileLogWriter.activateFailure = false; HLog.resetLogReaderClass(); try { conf.setClass("hbase.regionserver.hlog.reader.impl", FaultySequenceFileLogReader.class, HLog.Reader.class); conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name()); generateHLogs(Integer.MAX_VALUE); fs.initialize(fs.getUri(), conf); HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, hbaseDir, hlogDir, oldLogDir, fs); logSplitter.splitLog(); } finally { conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, Reader.class); HLog.resetLogReaderClass(); } }
@Test (timeout=300000, expected = IOException.class) public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException { conf.setBoolean(HBASE_SKIP_ERRORS, false); Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl", Reader.class); InstrumentedSequenceFileLogWriter.activateFailure = false; HLogFactory.resetLogReaderClass(); try { conf.setClass("hbase.regionserver.hlog.reader.impl", FaultySequenceFileLogReader.class, HLog.Reader.class); conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name()); generateHLogs(Integer.MAX_VALUE); fs.initialize(fs.getUri(), conf); HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf); } finally { conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, Reader.class); HLogFactory.resetLogReaderClass(); } }
private void parseHLog(final Reader in, Path path, EntryBuffers entryBuffers, final FileSystem fs, final Configuration conf, boolean skipErrors) throws IOException, CorruptedLogFileException { int editsCount = 0; try { Entry entry; while ((entry = getNextLogLine(in, path, skipErrors)) != null) { entryBuffers.appendEntry(entry); editsCount++; } } catch (InterruptedException ie) { IOException t = new InterruptedIOException(); t.initCause(ie); throw t; } finally { LOG.debug("Pushed=" + editsCount + " entries from " + path); } }
@Test(expected = IOException.class) public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException { conf.setBoolean(HBASE_SKIP_ERRORS, false); Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl", Reader.class); InstrumentedSequenceFileLogWriter.activateFailure = false; HLogFactory.resetLogReaderClass(); try { conf.setClass("hbase.regionserver.hlog.reader.impl", FaultySequenceFileLogReader.class, HLog.Reader.class); conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name()); generateHLogs(Integer.MAX_VALUE); fs.initialize(fs.getUri(), conf); HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, HBASEDIR, HLOGDIR, OLDLOGDIR, fs); logSplitter.splitLog(); } finally { conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, Reader.class); HLogFactory.resetLogReaderClass(); } }
static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) throws CorruptedLogFileException, IOException { try { return in.next(); } catch (EOFException eof) { // truncated files are expected if a RS crashes (see HBASE-2643) LOG.info("EOF from hlog " + path + ". continuing"); return null; } catch (IOException e) { // If the IOE resulted from bad file format, // then this problem is idempotent and retrying won't help if (e.getCause() != null && (e.getCause() instanceof ParseException || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) { LOG.warn("Parse exception " + e.getCause().toString() + " from hlog " + path + ". continuing"); return null; } if (!skipErrors) { throw e; } CorruptedLogFileException t = new CorruptedLogFileException("skipErrors=true Ignoring exception" + " while parsing hlog " + path + ". Marking as corrupted"); t.initCause(e); throw t; } }
@Test public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException { conf.setBoolean(HBASE_SKIP_ERRORS, true); Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl", Reader.class); InstrumentedSequenceFileLogWriter.activateFailure = false; HLog.resetLogReaderClass(); try { Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0"); conf.setClass("hbase.regionserver.hlog.reader.impl", FaultySequenceFileLogReader.class, HLog.Reader.class); for (FaultySequenceFileLogReader.FailureType failureType : FaultySequenceFileLogReader.FailureType.values()) { conf.set("faultysequencefilelogreader.failuretype", failureType.name()); generateHLogs(1, ENTRIES, -1); fs.initialize(fs.getUri(), conf); HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, hbaseDir, hlogDir, oldLogDir, fs); logSplitter.splitLog(); FileStatus[] archivedLogs = fs.listStatus(corruptDir); assertEquals("expected a different file", c1.getName(), archivedLogs[0] .getPath().getName()); assertEquals(archivedLogs.length, 1); fs.delete(new Path(oldLogDir, HLOG_FILE_PREFIX + "0"), false); } } finally { conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, Reader.class); HLog.resetLogReaderClass(); } }
@Test public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException { conf.setBoolean(HBASE_SKIP_ERRORS, false); Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl", Reader.class); InstrumentedSequenceFileLogWriter.activateFailure = false; HLog.resetLogReaderClass(); try { conf.setClass("hbase.regionserver.hlog.reader.impl", FaultySequenceFileLogReader.class, HLog.Reader.class); conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name()); generateHLogs(-1); fs.initialize(fs.getUri(), conf); HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, hbaseDir, hlogDir, oldLogDir, fs); try { logSplitter.splitLog(); } catch (IOException e) { assertEquals( "if skip.errors is false all files should remain in place", NUM_WRITERS, fs.listStatus(hlogDir).length); } } finally { conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, Reader.class); HLog.resetLogReaderClass(); } }
@SuppressWarnings("unused") private void dumpHLog(Path log, FileSystem fs, Configuration conf) throws IOException { HLog.Entry entry; HLog.Reader in = HLog.getReader(fs, log, conf); while ((entry = in.next()) != null) { System.out.println(entry); } }
private int countHLog(Path log, FileSystem fs, Configuration conf) throws IOException { int count = 0; HLog.Reader in = HLog.getReader(fs, log, conf); while (in.next() != null) { count++; } return count; }
private boolean logsAreEqual(Path p1, Path p2) throws IOException { HLog.Reader in1, in2; in1 = HLog.getReader(fs, p1, conf); in2 = HLog.getReader(fs, p2, conf); HLog.Entry entry1; HLog.Entry entry2; while ((entry1 = in1.next()) != null) { entry2 = in2.next(); if ((entry1.getKey().compareTo(entry2.getKey()) != 0) || (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) { return false; } } return true; }
private void verifySplits(List<Path> splits, final int howmany) throws IOException { assertEquals(howmany, splits.size()); for (int i = 0; i < splits.size(); i++) { LOG.info("Verifying=" + splits.get(i)); HLog.Reader reader = HLog.getReader(fs, splits.get(i), conf); try { int count = 0; String previousRegion = null; long seqno = -1; HLog.Entry entry = new HLog.Entry(); while((entry = reader.next(entry)) != null) { HLogKey key = entry.getKey(); String region = Bytes.toString(key.getEncodedRegionName()); // Assert that all edits are for same region. if (previousRegion != null) { assertEquals(previousRegion, region); } LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getLogSeqNum()); assertTrue(seqno < key.getLogSeqNum()); seqno = key.getLogSeqNum(); previousRegion = region; count++; } assertEquals(howmany * howmany, count); } finally { reader.close(); } } }
@Test (timeout=300000) public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException { conf.setBoolean(HBASE_SKIP_ERRORS, true); Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl", Reader.class); InstrumentedSequenceFileLogWriter.activateFailure = false; HLogFactory.resetLogReaderClass(); try { Path c1 = new Path(HLOGDIR, HLOG_FILE_PREFIX + "0"); conf.setClass("hbase.regionserver.hlog.reader.impl", FaultySequenceFileLogReader.class, HLog.Reader.class); for (FaultySequenceFileLogReader.FailureType failureType : FaultySequenceFileLogReader.FailureType.values()) { conf.set("faultysequencefilelogreader.failuretype", failureType.name()); generateHLogs(1, ENTRIES, -1); fs.initialize(fs.getUri(), conf); HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf); FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR); assertEquals("expected a different file", c1.getName(), archivedLogs[0] .getPath().getName()); assertEquals(archivedLogs.length, 1); fs.delete(new Path(OLDLOGDIR, HLOG_FILE_PREFIX + "0"), false); } } finally { conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, Reader.class); HLogFactory.resetLogReaderClass(); } }
@Test (timeout=300000) public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException { conf.setBoolean(HBASE_SKIP_ERRORS, false); Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl", Reader.class); InstrumentedSequenceFileLogWriter.activateFailure = false; HLogFactory.resetLogReaderClass(); try { conf.setClass("hbase.regionserver.hlog.reader.impl", FaultySequenceFileLogReader.class, HLog.Reader.class); conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name()); generateHLogs(-1); fs.initialize(fs.getUri(), conf); try { HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf); } catch (IOException e) { assertEquals( "if skip.errors is false all files should remain in place", NUM_WRITERS, fs.listStatus(HLOGDIR).length); } } finally { conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, Reader.class); HLogFactory.resetLogReaderClass(); } }
@Test (timeout=300000) public void testEOFisIgnored() throws IOException { conf.setBoolean(HBASE_SKIP_ERRORS, false); final String REGION = "region__1"; REGIONS.removeAll(REGIONS); REGIONS.add(REGION); int entryCount = 10; Path c1 = new Path(HLOGDIR, HLOG_FILE_PREFIX + "0"); generateHLogs(1, entryCount, -1); corruptHLog(c1, Corruptions.TRUNCATE, true, fs); fs.initialize(fs.getUri(), conf); HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf); Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); assertEquals(1, splitLog.length); int actualCount = 0; HLog.Reader in = HLogFactory.createReader(fs, splitLog[0], conf); @SuppressWarnings("unused") HLog.Entry entry; while ((entry = in.next()) != null) ++actualCount; assertEquals(entryCount-1, actualCount); // should not have stored the EOF files as corrupt FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR); assertEquals(archivedLogs.length, 0); }
@Test (timeout=300000) public void testCorruptWALTrailer() throws IOException { conf.setBoolean(HBASE_SKIP_ERRORS, false); final String REGION = "region__1"; REGIONS.removeAll(REGIONS); REGIONS.add(REGION); int entryCount = 10; Path c1 = new Path(HLOGDIR, HLOG_FILE_PREFIX + "0"); generateHLogs(1, entryCount, -1); corruptHLog(c1, Corruptions.TRUNCATE_TRAILER, true, fs); fs.initialize(fs.getUri(), conf); HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf); Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); assertEquals(1, splitLog.length); int actualCount = 0; HLog.Reader in = HLogFactory.createReader(fs, splitLog[0], conf); @SuppressWarnings("unused") HLog.Entry entry; while ((entry = in.next()) != null) ++actualCount; assertEquals(entryCount, actualCount); // should not have stored the EOF files as corrupt FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR); assertEquals(archivedLogs.length, 0); }
@SuppressWarnings("unused") private void dumpHLog(Path log, FileSystem fs, Configuration conf) throws IOException { HLog.Entry entry; HLog.Reader in = HLogFactory.createReader(fs, log, conf); while ((entry = in.next()) != null) { System.out.println(entry); } }
private int countHLog(Path log, FileSystem fs, Configuration conf) throws IOException { int count = 0; HLog.Reader in = HLogFactory.createReader(fs, log, conf); while (in.next() != null) { count++; } return count; }
private boolean logsAreEqual(Path p1, Path p2) throws IOException { HLog.Reader in1, in2; in1 = HLogFactory.createReader(fs, p1, conf); in2 = HLogFactory.createReader(fs, p2, conf); HLog.Entry entry1; HLog.Entry entry2; while ((entry1 = in1.next()) != null) { entry2 = in2.next(); if ((entry1.getKey().compareTo(entry2.getKey()) != 0) || (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) { return false; } } return true; }
private void verifySplits(List<Path> splits, final int howmany) throws IOException { assertEquals(howmany * howmany, splits.size()); for (int i = 0; i < splits.size(); i++) { LOG.info("Verifying=" + splits.get(i)); HLog.Reader reader = HLogFactory.createReader(fs, splits.get(i), conf); try { int count = 0; String previousRegion = null; long seqno = -1; HLog.Entry entry = new HLog.Entry(); while((entry = reader.next(entry)) != null) { HLogKey key = entry.getKey(); String region = Bytes.toString(key.getEncodedRegionName()); // Assert that all edits are for same region. if (previousRegion != null) { assertEquals(previousRegion, region); } LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getLogSeqNum()); assertTrue(seqno < key.getLogSeqNum()); seqno = key.getLogSeqNum(); previousRegion = region; count++; } assertEquals(howmany, count); } finally { reader.close(); } } }