@Override public int remainingTasksInCoordination() { int count = 0; try { List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode); if (tasks != null) { int listSize = tasks.size(); for (int i = 0; i < listSize; i++) { if (!ZKSplitLog.isRescanNode(tasks.get(i))) { count++; } } } } catch (KeeperException ke) { LOG.warn("Failed to check remaining tasks", ke); count = -1; } return count; }
/** * signal the workers that a task was resubmitted by creating the RESCAN node. */ private void rescan(long retries) { // The RESCAN node will be deleted almost immediately by the // SplitLogManager as soon as it is created because it is being // created in the DONE state. This behavior prevents a buildup // of RESCAN nodes. But there is also a chance that a SplitLogWorker // might miss the watch-trigger that creation of RESCAN node provides. // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks // therefore this behavior is safe. SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName(), getRecoveryMode()); this.watcher .getRecoverableZooKeeper() .getZooKeeper() .create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), Long.valueOf(retries)); }
private void deleteNodeSuccess(String path) { if (ignoreZKDeleteForTesting) { return; } Task task; task = details.getTasks().remove(path); if (task == null) { if (ZKSplitLog.isRescanNode(watcher, path)) { SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet(); } SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet(); LOG.debug("deleted task without in memory state " + path); return; } synchronized (task) { task.status = DELETED; task.notify(); } SplitLogCounters.tot_mgr_task_deleted.incrementAndGet(); }
private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException, InterruptedException { String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name); NodeCreationListener listener = new NodeCreationListener(zkw, tasknode); zkw.registerListener(listener); ZKUtil.watchAndCheckExists(zkw, tasknode); slm.enqueueSplitTask(name, batch); assertEquals(1, batch.installed); assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch); assertEquals(1L, tot_mgr_node_create_queued.get()); LOG.debug("waiting for task node creation"); listener.waitForCreation(); LOG.debug("task created"); return tasknode; }
@Test (timeout=180000) public void testOrphanTaskAcquisition() throws Exception { LOG.info("TestOrphanTaskAcquisition"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER, this.mode); zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); assertFalse(task.isUnassigned()); long curt = System.currentTimeMillis(); assertTrue((task.last_update <= curt) && (task.last_update > (curt - 1000))); LOG.info("waiting for manager to resubmit the orphan task"); waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); assertTrue(task.isUnassigned()); waitForCounter(tot_mgr_rescan, 0, 1, to + to/2); }
@Ignore("DLR is broken by HBASE-12751") @Test(timeout=60000) public void testGetPreviousRecoveryMode() throws Exception { LOG.info("testGetPreviousRecoveryMode"); SplitLogCounters.resetCounters(); // Not actually enabling DLR for the cluster, just for the ZkCoordinatedStateManager to use. // The test is just manipulating ZK manually anyways. conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), new SplitLogTask.Unassigned( ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); LOG.info("Mode1=" + slm.getRecoveryMode()); assertTrue(slm.isLogSplitting()); zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1); LOG.info("Mode2=" + slm.getRecoveryMode()); slm.setRecoveryMode(false); LOG.info("Mode3=" + slm.getRecoveryMode()); assertTrue("Mode4=" + slm.getRecoveryMode(), slm.isLogReplaying()); }
@Test(timeout=60000) public void testAcquireTaskAtStartup() throws Exception { LOG.info("testAcquireTaskAtStartup"); SplitLogCounters.resetCounters(); final String TATAS = "tatas"; final ServerName RS = ServerName.valueOf("rs,1,1"); RegionServerServices mockedRS = getRegionServer(RS); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS), new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); SplitLogWorker slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); slw.start(); try { waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS)); SplitLogTask slt = SplitLogTask.parseFrom(bytes); assertTrue(slt.isOwned(RS)); } finally { stopSplitLogWorker(slw); } }
private int remainingTasksInZK() { int count = 0; try { List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode); if (tasks != null) { for (String t: tasks) { if (!ZKSplitLog.isRescanNode(watcher, t)) { count++; } } } } catch (KeeperException ke) { LOG.warn("Failed to check remaining tasks", ke); count = -1; } return count; }
private void deleteNodeSuccess(String path) { if (ignoreZKDeleteForTesting) { return; } Task task; task = tasks.remove(path); if (task == null) { if (ZKSplitLog.isRescanNode(watcher, path)) { tot_mgr_rescan_deleted.incrementAndGet(); } tot_mgr_missing_state_in_delete.incrementAndGet(); LOG.debug("deleted task without in memory state " + path); return; } synchronized (task) { task.status = DELETED; task.notify(); } tot_mgr_task_deleted.incrementAndGet(); }
public static void finishSplitLogFile(Path rootdir, Path oldLogDir, String logfile, Configuration conf) throws IOException { List<Path> processedLogs = new ArrayList<Path>(); List<Path> corruptedLogs = new ArrayList<Path>(); FileSystem fs; fs = rootdir.getFileSystem(conf); Path logPath = new Path(logfile); if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) { corruptedLogs.add(logPath); } else { processedLogs.add(logPath); } archiveLogs(null, corruptedLogs, processedLogs, oldLogDir, fs, conf); Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName()); HBaseFileSystem.deleteDirFromFileSystem(fs, stagingDir); }
@Test public void testOrphanTaskAcquisition() throws Exception { LOG.info("TestOrphanTaskAcquisition"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); zkw.getRecoverableZooKeeper().create(tasknode, TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); assertFalse(task.isUnassigned()); long curt = System.currentTimeMillis(); assertTrue((task.last_update <= curt) && (task.last_update > (curt - 1000))); LOG.info("waiting for manager to resubmit the orphan task"); waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); assertTrue(task.isUnassigned()); waitForCounter(tot_mgr_rescan, 0, 1, to + to/2); }
@Test public void testTaskErr() throws Exception { LOG.info("TestTaskErr - cleanup task node once in ERR state"); conf.setInt("hbase.splitlog.max.resubmit", 0); slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); ZKUtil.setData(zkw, tasknode, TaskState.TASK_ERR.get("worker")); synchronized (batch) { while (batch.installed != batch.error) { batch.wait(); } } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT); }
@Test public void testWorkerCrash() throws Exception { conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT); slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = new ServerName("worker1,1,1"); ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get(worker1.getServerName())); if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2); // Not yet resubmitted. Assert.assertEquals(0, tot_mgr_resubmit.get()); // This server becomes dead Mockito.when(sm.isServerOnline(worker1)).thenReturn(false); Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded). // It has been resubmitted Assert.assertEquals(1, tot_mgr_resubmit.get()); }
private void startCluster(int num_master, int num_rs, Configuration inConf) throws Exception { ZKSplitLog.Counters.resetCounters(); LOG.info("Starting cluster"); this.conf = inConf; conf.getLong("hbase.splitlog.max.resubmit", 0); // Make the failure test faster conf.setInt("zookeeper.recovery.retry", 0); TEST_UTIL = new HBaseTestingUtility(conf); TEST_UTIL.startMiniCluster(num_master, num_rs); cluster = TEST_UTIL.getHBaseCluster(); LOG.info("Waiting for active/ready master"); cluster.waitForActiveAndReadyMaster(); master = cluster.getMaster(); while (cluster.getLiveRegionServerThreads().size() < num_rs) { Threads.sleep(1); } }
@Test(timeout=60000) public void testAcquireTaskAtStartup() throws Exception { LOG.info("testAcquireTaskAtStartup"); ZKSplitLog.Counters.resetCounters(); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"), TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), "rs", neverEndingTask); slw.start(); try { waitForCounter(tot_wkr_task_acquired, 0, 1, WAIT_TIME); assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tatas")), "rs")); } finally { stopSplitLogWorker(slw); } }
@Override public int remainingTasksInCoordination() { int count = 0; try { List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode); if (tasks != null) { int listSize = tasks.size(); for (int i = 0; i < listSize; i++) { if (!ZKSplitLog.isRescanNode(watcher, tasks.get(i))) { count++; } } } } catch (KeeperException ke) { LOG.warn("Failed to check remaining tasks", ke); count = -1; } return count; }
@Test(timeout=60000) public void testGetPreviousRecoveryMode() throws Exception { LOG.info("testGetPreviousRecoveryMode"); SplitLogCounters.resetCounters(); Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); testConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); ds = new DummyServer(zkw, testConf); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), new SplitLogTask.Unassigned( ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); slm = new SplitLogManager(ds, testConf, stopper, master, DUMMY_MASTER); LOG.info("Mode1=" + slm.getRecoveryMode()); assertTrue(slm.isLogSplitting()); zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1); LOG.info("Mode2=" + slm.getRecoveryMode()); slm.setRecoveryMode(false); LOG.info("Mode3=" + slm.getRecoveryMode()); assertTrue("Mode4=" + slm.getRecoveryMode(), slm.isLogReplaying()); }
private void deleteNodeSuccess(String path) { if (ignoreZKDeleteForTesting) { return; } Task task; task = tasks.remove(path); if (task == null) { if (ZKSplitLog.isRescanNode(watcher, path)) { SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet(); } SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet(); LOG.debug("deleted task without in memory state " + path); return; } synchronized (task) { task.status = DELETED; task.notify(); } SplitLogCounters.tot_mgr_task_deleted.incrementAndGet(); }
/** * signal the workers that a task was resubmitted by creating the * RESCAN node. * @throws KeeperException */ private void createRescanNode(long retries) { // The RESCAN node will be deleted almost immediately by the // SplitLogManager as soon as it is created because it is being // created in the DONE state. This behavior prevents a buildup // of RESCAN nodes. But there is also a chance that a SplitLogWorker // might miss the watch-trigger that creation of RESCAN node provides. // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks // therefore this behavior is safe. lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis(); SplitLogTask slt = new SplitLogTask.Done(this.serverName); this.watcher.getRecoverableZooKeeper().getZooKeeper(). create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), Long.valueOf(retries)); }
@Test public void testOrphanTaskAcquisition() throws Exception { LOG.info("TestOrphanTaskAcquisition"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER); zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); assertFalse(task.isUnassigned()); long curt = System.currentTimeMillis(); assertTrue((task.last_update <= curt) && (task.last_update > (curt - 1000))); LOG.info("waiting for manager to resubmit the orphan task"); waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); assertTrue(task.isUnassigned()); waitForCounter(tot_mgr_rescan, 0, 1, to + to/2); }
@Test(timeout=60000) public void testAcquireTaskAtStartup() throws Exception { LOG.info("testAcquireTaskAtStartup"); SplitLogCounters.resetCounters(); final String TATAS = "tatas"; final ServerName RS = ServerName.valueOf("rs,1,1"); RegionServerServices mockedRS = getRegionServer(RS); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS), new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); slw.start(); try { waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS)); SplitLogTask slt = SplitLogTask.parseFrom(bytes); assertTrue(slt.isOwned(RS)); } finally { stopSplitLogWorker(slw); } }
@Override public int remainingTasksInCoordination() { int count = 0; try { List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.znodePaths.splitLogZNode); if (tasks != null) { int listSize = tasks.size(); for (int i = 0; i < listSize; i++) { if (!ZKSplitLog.isRescanNode(tasks.get(i))) { count++; } } } } catch (KeeperException ke) { LOG.warn("Failed to check remaining tasks", ke); count = -1; } return count; }
/** * signal the workers that a task was resubmitted by creating the RESCAN node. */ private void rescan(long retries) { // The RESCAN node will be deleted almost immediately by the // SplitLogManager as soon as it is created because it is being // created in the DONE state. This behavior prevents a buildup // of RESCAN nodes. But there is also a chance that a SplitLogWorker // might miss the watch-trigger that creation of RESCAN node provides. // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks // therefore this behavior is safe. SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName()); this.watcher .getRecoverableZooKeeper() .getZooKeeper() .create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), Long.valueOf(retries)); }