private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException, InterruptedException { String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name); NodeCreationListener listener = new NodeCreationListener(zkw, tasknode); zkw.registerListener(listener); ZKUtil.watchAndCheckExists(zkw, tasknode); slm.enqueueSplitTask(name, batch); assertEquals(1, batch.installed); assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch); assertEquals(1L, tot_mgr_node_create_queued.get()); LOG.debug("waiting for task node creation"); listener.waitForCreation(); LOG.debug("task created"); return tasknode; }
@Test (timeout=180000) public void testTaskDone() throws Exception { LOG.info("TestTaskDone - cleanup task node once in DONE state"); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Done(worker1, this.mode); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); synchronized (batch) { while (batch.installed != batch.done) { batch.wait(); } } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); }
@Test (timeout=180000) public void testTaskErr() throws Exception { LOG.info("TestTaskErr - cleanup task node once in ERR state"); conf.setInt("hbase.splitlog.max.resubmit", 0); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Err(worker1, this.mode); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); synchronized (batch) { while (batch.installed != batch.error) { batch.wait(); } } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT); }
@Test (timeout=180000) public void testTaskResigned() throws Exception { LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); assertEquals(tot_mgr_resubmit.get(), 0); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); assertEquals(tot_mgr_resubmit.get(), 0); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); assertEquals(tot_mgr_resubmit.get(), 0); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); assertEquals(tot_mgr_resubmit.get(), 0); SplitLogTask slt = new SplitLogTask.Resigned(worker1, this.mode); assertEquals(tot_mgr_resubmit.get(), 0); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); int version = ZKUtil.checkExists(zkw, tasknode); // Could be small race here. if (tot_mgr_resubmit.get() == 0) { waitForCounter(tot_mgr_resubmit, 0, 1, to/2); } assertEquals(tot_mgr_resubmit.get(), 1); byte[] taskstate = ZKUtil.getData(zkw, tasknode); slt = SplitLogTask.parseFrom(taskstate); assertTrue(slt.isUnassigned(DUMMY_MASTER)); }
@Test (timeout=180000) public void testWorkerCrash() throws Exception { slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); // Not yet resubmitted. Assert.assertEquals(0, tot_mgr_resubmit.get()); // This server becomes dead Mockito.when(sm.isServerOnline(worker1)).thenReturn(false); Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded). // It has been resubmitted Assert.assertEquals(1, tot_mgr_resubmit.get()); }
@Test public void testTaskDone() throws Exception { LOG.info("TestTaskDone - cleanup task node once in DONE state"); slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); ZKUtil.setData(zkw, tasknode, TaskState.TASK_DONE.get("worker")); synchronized (batch) { while (batch.installed != batch.done) { batch.wait(); } } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); }
@Test public void testTaskErr() throws Exception { LOG.info("TestTaskErr - cleanup task node once in ERR state"); conf.setInt("hbase.splitlog.max.resubmit", 0); slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); ZKUtil.setData(zkw, tasknode, TaskState.TASK_ERR.get("worker")); synchronized (batch) { while (batch.installed != batch.error) { batch.wait(); } } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT); }
@Test public void testTaskResigned() throws Exception { LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); ZKUtil.setData(zkw, tasknode, TaskState.TASK_RESIGNED.get("worker")); int version = ZKUtil.checkExists(zkw, tasknode); waitForCounter(tot_mgr_resubmit, 0, 1, to/2); int version1 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version1 > version); byte[] taskstate = ZKUtil.getData(zkw, tasknode); assertTrue(Arrays.equals(taskstate, TaskState.TASK_UNASSIGNED.get("dummy-master"))); }
@Test public void testDeadWorker() throws Exception { LOG.info("testDeadWorker"); conf.setLong("hbase.splitlog.max.resubmit", 0); slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); int version = ZKUtil.checkExists(zkw, tasknode); ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1")); waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); slm.handleDeadWorker("worker1"); waitForCounter(tot_mgr_resubmit, 0, 1, to/2); waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2); int version1 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version1 > version); byte[] taskstate = ZKUtil.getData(zkw, tasknode); assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"), taskstate)); return; }
@Test public void testWorkerCrash() throws Exception { conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT); slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = new ServerName("worker1,1,1"); ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get(worker1.getServerName())); if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2); // Not yet resubmitted. Assert.assertEquals(0, tot_mgr_resubmit.get()); // This server becomes dead Mockito.when(sm.isServerOnline(worker1)).thenReturn(false); Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded). // It has been resubmitted Assert.assertEquals(1, tot_mgr_resubmit.get()); }
@Test public void testTaskDone() throws Exception { LOG.info("TestTaskDone - cleanup task node once in DONE state"); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Done(worker1); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); synchronized (batch) { while (batch.installed != batch.done) { batch.wait(); } } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); }
@Test public void testTaskErr() throws Exception { LOG.info("TestTaskErr - cleanup task node once in ERR state"); conf.setInt("hbase.splitlog.max.resubmit", 0); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Err(worker1); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); synchronized (batch) { while (batch.installed != batch.error) { batch.wait(); } } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); conf.setInt("hbase.splitlog.max.resubmit", SplitLogManager.DEFAULT_MAX_RESUBMIT); }
@Test public void testTaskResigned() throws Exception { LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); assertEquals(tot_mgr_resubmit.get(), 0); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); assertEquals(tot_mgr_resubmit.get(), 0); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); assertEquals(tot_mgr_resubmit.get(), 0); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); assertEquals(tot_mgr_resubmit.get(), 0); SplitLogTask slt = new SplitLogTask.Resigned(worker1); assertEquals(tot_mgr_resubmit.get(), 0); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); int version = ZKUtil.checkExists(zkw, tasknode); // Could be small race here. if (tot_mgr_resubmit.get() == 0) { waitForCounter(tot_mgr_resubmit, 0, 1, to/2); } assertEquals(tot_mgr_resubmit.get(), 1); byte[] taskstate = ZKUtil.getData(zkw, tasknode); slt = SplitLogTask.parseFrom(taskstate); assertTrue(slt.isUnassigned(DUMMY_MASTER)); }
@Test public void testWorkerCrash() throws Exception { slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Owned(worker1); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); // Not yet resubmitted. Assert.assertEquals(0, tot_mgr_resubmit.get()); // This server becomes dead Mockito.when(sm.isServerOnline(worker1)).thenReturn(false); Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded). // It has been resubmitted Assert.assertEquals(1, tot_mgr_resubmit.get()); }
private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException, InterruptedException { String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name); NodeCreationListener listener = new NodeCreationListener(zkw, tasknode); zkw.registerListener(listener); ZKUtil.watchAndCheckExists(zkw, tasknode); slm.enqueueSplitTask(name, batch); assertEquals(1, batch.installed); assertTrue(findOrCreateOrphanTask(tasknode).batch == batch); assertEquals(1L, tot_mgr_node_create_queued.sum()); LOG.debug("waiting for task node creation"); listener.waitForCreation(); LOG.debug("task created"); return tasknode; }
@Test (timeout=180000) public void testTaskDone() throws Exception { LOG.info("TestTaskDone - cleanup task node once in DONE state"); slm = new SplitLogManager(master, conf); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Done(worker1); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); synchronized (batch) { while (batch.installed != batch.done) { batch.wait(); } } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); }
@Test (timeout=180000) public void testTaskErr() throws Exception { LOG.info("TestTaskErr - cleanup task node once in ERR state"); conf.setInt("hbase.splitlog.max.resubmit", 0); slm = new SplitLogManager(master, conf); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Err(worker1); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); synchronized (batch) { while (batch.installed != batch.error) { batch.wait(); } } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT); }
@Test (timeout=180000) public void testTaskResigned() throws Exception { LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); assertEquals(0, tot_mgr_resubmit.sum()); slm = new SplitLogManager(master, conf); assertEquals(0, tot_mgr_resubmit.sum()); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); assertEquals(0, tot_mgr_resubmit.sum()); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); assertEquals(0, tot_mgr_resubmit.sum()); SplitLogTask slt = new SplitLogTask.Resigned(worker1); assertEquals(0, tot_mgr_resubmit.sum()); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); ZKUtil.checkExists(zkw, tasknode); // Could be small race here. if (tot_mgr_resubmit.sum() == 0) { waitForCounter(tot_mgr_resubmit, 0, 1, to/2); } assertEquals(1, tot_mgr_resubmit.sum()); byte[] taskstate = ZKUtil.getData(zkw, tasknode); slt = SplitLogTask.parseFrom(taskstate); assertTrue(slt.isUnassigned(master.getServerName())); }
@Test (timeout=180000) public void testWorkerCrash() throws Exception { slm = new SplitLogManager(master, conf); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Owned(worker1); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); if (tot_mgr_heartbeat.sum() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); // Not yet resubmitted. Assert.assertEquals(0, tot_mgr_resubmit.sum()); // This server becomes dead Mockito.when(sm.isServerOnline(worker1)).thenReturn(false); Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded). // It has been resubmitted Assert.assertEquals(1, tot_mgr_resubmit.sum()); }