@Test (timeout=180000) public void testTaskErr() throws Exception { LOG.info("TestTaskErr - cleanup task node once in ERR state"); conf.setInt("hbase.splitlog.max.resubmit", 0); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Err(worker1, this.mode); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); synchronized (batch) { while (batch.installed != batch.error) { batch.wait(); } } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT); }
@Test (timeout=180000) public void testTaskErr() throws Exception { LOG.info("TestTaskErr - cleanup task node once in ERR state"); conf.setInt("hbase.splitlog.max.resubmit", 0); slm = new SplitLogManager(master, conf); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Err(worker1); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); synchronized (batch) { while (batch.installed != batch.error) { batch.wait(); } } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT); }
public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) { super(controller, entryBuffers, numWriters); this.waitRegionOnlineTimeOut = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT); this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriters); this.logRecoveredEditsOutputSink.setReporter(reporter); }
public LogReplayOutputSink(int numWriters) { super(numWriters); this.waitRegionOnlineTimeOut = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT); this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters); this.logRecoveredEditsOutputSink.setReporter(reporter); }
@Test(timeout=30000) public void testDelayedDeleteOnFailure() throws Exception { LOG.info("testDelayedDeleteOnFailure"); startCluster(1); final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; final FileSystem fs = master.getMasterFileSystem().getFileSystem(); final Path logDir = new Path(FSUtils.getRootDir(conf), "x"); fs.mkdirs(logDir); ExecutorService executor = null; try { final Path corruptedLogFile = new Path(logDir, "x"); FSDataOutputStream out; out = fs.create(corruptedLogFile); out.write(0); out.write(Bytes.toBytes("corrupted bytes")); out.close(); ZKSplitLogManagerCoordination coordination = (ZKSplitLogManagerCoordination) ((BaseCoordinatedStateManager) master .getCoordinatedStateManager()).getSplitLogManagerCoordination(); coordination.setIgnoreDeleteForTesting(true); executor = Executors.newSingleThreadExecutor(); Runnable runnable = new Runnable() { @Override public void run() { try { // since the logDir is a fake, corrupted one, so the split log worker // will finish it quickly with error, and this call will fail and throw // an IOException. slm.splitLogDistributed(logDir); } catch (IOException ioe) { try { assertTrue(fs.exists(corruptedLogFile)); // this call will block waiting for the task to be removed from the // tasks map which is not going to happen since ignoreZKDeleteForTesting // is set to true, until it is interrupted. slm.splitLogDistributed(logDir); } catch (IOException e) { assertTrue(Thread.currentThread().isInterrupted()); return; } fail("did not get the expected IOException from the 2nd call"); } fail("did not get the expected IOException from the 1st call"); } }; Future<?> result = executor.submit(runnable); try { result.get(2000, TimeUnit.MILLISECONDS); } catch (TimeoutException te) { // it is ok, expected. } waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000); executor.shutdownNow(); executor = null; // make sure the runnable is finished with no exception thrown. result.get(); } finally { if (executor != null) { // interrupt the thread in case the test fails in the middle. // it has no effect if the thread is already terminated. executor.shutdownNow(); } fs.delete(logDir, true); } }
@Test(timeout = 30000) public void testDelayedDeleteOnFailure() throws Exception { LOG.info("testDelayedDeleteOnFailure"); startCluster(1); final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); final FileSystem fs = master.getMasterFileSystem().getFileSystem(); final Path logDir = new Path(new Path(FSUtils.getRootDir(conf), HConstants.HREGION_LOGDIR_NAME), ServerName.valueOf("x", 1, 1).toString()); fs.mkdirs(logDir); ExecutorService executor = null; try { final Path corruptedLogFile = new Path(logDir, "x"); FSDataOutputStream out; out = fs.create(corruptedLogFile); out.write(0); out.write(Bytes.toBytes("corrupted bytes")); out.close(); ZKSplitLogManagerCoordination coordination = (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager()) .getSplitLogManagerCoordination(); coordination.setIgnoreDeleteForTesting(true); executor = Executors.newSingleThreadExecutor(); Runnable runnable = new Runnable() { @Override public void run() { try { // since the logDir is a fake, corrupted one, so the split log worker // will finish it quickly with error, and this call will fail and throw // an IOException. slm.splitLogDistributed(logDir); } catch (IOException ioe) { try { assertTrue(fs.exists(corruptedLogFile)); // this call will block waiting for the task to be removed from the // tasks map which is not going to happen since ignoreZKDeleteForTesting // is set to true, until it is interrupted. slm.splitLogDistributed(logDir); } catch (IOException e) { assertTrue(Thread.currentThread().isInterrupted()); return; } fail("did not get the expected IOException from the 2nd call"); } fail("did not get the expected IOException from the 1st call"); } }; Future<?> result = executor.submit(runnable); try { result.get(2000, TimeUnit.MILLISECONDS); } catch (TimeoutException te) { // it is ok, expected. } waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000); executor.shutdownNow(); executor = null; // make sure the runnable is finished with no exception thrown. result.get(); } finally { if (executor != null) { // interrupt the thread in case the test fails in the middle. // it has no effect if the thread is already terminated. executor.shutdownNow(); } fs.delete(logDir, true); } }