@Test public void testCleanupUnknownPeerZNode() throws Exception { Server server = new DummyServer("hostname2.example.org"); ReplicationQueueStorage rq = ReplicationStorageFactory .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); // populate some znodes in the peer znode // add log to an unknown peer String group = "testgroup"; rq.addWAL(server.getServerName(), "2", group + ".log1"); rq.addWAL(server.getServerName(), "2", group + ".log2"); NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName()); w1.run(); // The log of the unknown peer should be removed from zk for (String peer : manager.getAllQueues()) { assertTrue(peer.startsWith("1")); } }
@Test public void testCleanupFailoverQueues() throws Exception { final Server server = new DummyServer("hostname1.example.org"); ReplicationQueues rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), server); rq.init(server.getServerName().toString()); // populate some znodes in the peer znode SortedSet<String> files = new TreeSet<String>(); String group = "testgroup"; String file1 = group + ".log1"; String file2 = group + ".log2"; files.add(file1); files.add(file2); for (String file : files) { rq.addLog("1", file); } Server s1 = new DummyServer("dummyserver1.example.org"); ReplicationQueues rq1 = ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); rq1.init(s1.getServerName().toString()); ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1); rp1.init(); NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID( new Long(1), new Long(2))); w1.start(); w1.join(5000); assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); String id = "1-" + server.getServerName().getServerName(); assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); manager.cleanOldLogs(file2, id, true); // log1 should be deleted assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); }
@Test public void testCleanupFailoverQueues() throws Exception { final Server server = new DummyServer("hostname1.example.org"); ReplicationQueues rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), server); rq.init(server.getServerName().toString()); // populate some znodes in the peer znode SortedSet<String> files = new TreeSet<String>(); files.add("log1"); files.add("log2"); for (String file : files) { rq.addLog("1", file); } Server s1 = new DummyServer("dummyserver1.example.org"); ReplicationQueues rq1 = ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); rq1.init(s1.getServerName().toString()); ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1); rp1.init(); NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID( new Long(1), new Long(2))); w1.start(); w1.join(5000); assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); String id = "1-" + server.getServerName().getServerName(); assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id)); manager.cleanOldLogs("log2", id, true); // log1 should be deleted assertEquals(Sets.newHashSet("log2"), manager.getWalsByIdRecoveredQueues().get(id)); }
@Test public void testCleanupFailoverQueues() throws Exception { Server server = new DummyServer("hostname1.example.org"); ReplicationQueueStorage rq = ReplicationStorageFactory .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); // populate some znodes in the peer znode SortedSet<String> files = new TreeSet<>(); String group = "testgroup"; String file1 = group + ".log1"; String file2 = group + ".log2"; files.add(file1); files.add(file2); for (String file : files) { rq.addWAL(server.getServerName(), "1", file); } Server s1 = new DummyServer("dummyserver1.example.org"); ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration()); rp1.init(); NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName()); w1.run(); assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); String id = "1-" + server.getServerName().getServerName(); assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); manager.cleanOldLogs(file2, id, true); // log1 should be deleted assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); }