void getDataSetWatchSuccess(String path, byte[] data) { synchronized (grabTaskLock) { if (workerInGrabTask) { // currentTask can change but that's ok String taskpath = currentTask; if (taskpath != null && taskpath.equals(path)) { // have to compare data. cannot compare version because then there // will be race with attemptToOwnTask() // cannot just check whether the node has been transitioned to // UNASSIGNED because by the time this worker sets the data watch // the node might have made two transitions - from owned by this // worker to unassigned to owned by another worker if (! TaskState.TASK_OWNED.equals(data, serverName) && ! TaskState.TASK_DONE.equals(data, serverName) && ! TaskState.TASK_ERR.equals(data, serverName) && ! TaskState.TASK_RESIGNED.equals(data, serverName)) { LOG.info("task " + taskpath + " preempted from " + serverName + ", current task state and owner=" + new String(data)); stopTask(); } } } } }
@Test public void testOrphanTaskAcquisition() throws Exception { LOG.info("TestOrphanTaskAcquisition"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); zkw.getRecoverableZooKeeper().create(tasknode, TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); assertFalse(task.isUnassigned()); long curt = System.currentTimeMillis(); assertTrue((task.last_update <= curt) && (task.last_update > (curt - 1000))); LOG.info("waiting for manager to resubmit the orphan task"); waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); assertTrue(task.isUnassigned()); waitForCounter(tot_mgr_rescan, 0, 1, to + to/2); }
@Test public void testTaskDone() throws Exception { LOG.info("TestTaskDone - cleanup task node once in DONE state"); slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); ZKUtil.setData(zkw, tasknode, TaskState.TASK_DONE.get("worker")); synchronized (batch) { while (batch.installed != batch.done) { batch.wait(); } } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); }
@Test public void testTaskErr() throws Exception { LOG.info("TestTaskErr - cleanup task node once in ERR state"); conf.setInt("hbase.splitlog.max.resubmit", 0); slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); ZKUtil.setData(zkw, tasknode, TaskState.TASK_ERR.get("worker")); synchronized (batch) { while (batch.installed != batch.error) { batch.wait(); } } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT); }
@Test public void testTaskResigned() throws Exception { LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); ZKUtil.setData(zkw, tasknode, TaskState.TASK_RESIGNED.get("worker")); int version = ZKUtil.checkExists(zkw, tasknode); waitForCounter(tot_mgr_resubmit, 0, 1, to/2); int version1 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version1 > version); byte[] taskstate = ZKUtil.getData(zkw, tasknode); assertTrue(Arrays.equals(taskstate, TaskState.TASK_UNASSIGNED.get("dummy-master"))); }
@Test public void testDeadWorker() throws Exception { LOG.info("testDeadWorker"); conf.setLong("hbase.splitlog.max.resubmit", 0); slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); int version = ZKUtil.checkExists(zkw, tasknode); ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1")); waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); slm.handleDeadWorker("worker1"); waitForCounter(tot_mgr_resubmit, 0, 1, to/2); waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2); int version1 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version1 > version); byte[] taskstate = ZKUtil.getData(zkw, tasknode); assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"), taskstate)); return; }
@Test public void testWorkerCrash() throws Exception { conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT); slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = new ServerName("worker1,1,1"); ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get(worker1.getServerName())); if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2); // Not yet resubmitted. Assert.assertEquals(0, tot_mgr_resubmit.get()); // This server becomes dead Mockito.when(sm.isServerOnline(worker1)).thenReturn(false); Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded). // It has been resubmitted Assert.assertEquals(1, tot_mgr_resubmit.get()); }
@Test(timeout=60000) public void testAcquireTaskAtStartup() throws Exception { LOG.info("testAcquireTaskAtStartup"); ZKSplitLog.Counters.resetCounters(); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"), TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), "rs", neverEndingTask); slw.start(); try { waitForCounter(tot_wkr_task_acquired, 0, 1, WAIT_TIME); assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tatas")), "rs")); } finally { stopSplitLogWorker(slw); } }
void getDataSetWatchSuccess(String path, byte[] data) { synchronized (grabTaskLock) { if (workerInGrabTask) { // currentTask can change but that's ok String taskpath = currentTask; if (taskpath != null && taskpath.equals(path)) { // have to compare data. cannot compare version because then there // will be race with ownTask() // cannot just check whether the node has been transitioned to // UNASSIGNED because by the time this worker sets the data watch // the node might have made two transitions - from owned by this // worker to unassigned to owned by another worker if (! TaskState.TASK_OWNED.equals(data, serverName) && ! TaskState.TASK_DONE.equals(data, serverName) && ! TaskState.TASK_ERR.equals(data, serverName) && ! TaskState.TASK_RESIGNED.equals(data, serverName)) { LOG.info("task " + taskpath + " preempted from " + serverName + ", current task state and owner=" + new String(data)); stopTask(); } } } } }
@Test public void testAcquireTaskAtStartup() throws Exception { LOG.info("testAcquireTaskAtStartup"); ZKSplitLog.Counters.resetCounters(); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"), TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), "rs", neverEndingTask); slw.start(); try { waitForCounter(tot_wkr_task_acquired, 0, 1, 1500); assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tatas")), "rs")); } finally { stopSplitLogWorker(slw); } }