/** * Factory method to create a replication source * @param conf the configuration to use * @param fs the file system to use * @param manager the manager to use * @param stopper the stopper object for this region server * @param peerId the id of the peer cluster * @return the created source * @throws IOException */ protected ReplicationSourceInterface getReplicationSource(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException { ReplicationSourceInterface src; try { @SuppressWarnings("rawtypes") Class c = Class.forName(conf.get("replication.replicationsource.implementation", ReplicationSource.class.getCanonicalName())); src = (ReplicationSourceInterface) c.newInstance(); } catch (Exception e) { LOG.warn("Passed replication source implementation throws errors, " + "defaulting to ReplicationSource", e); src = new ReplicationSource(); } src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId); return src; }
/** * Creates a replication manager and sets the watch on all the other registered region servers * @param replicationQueues the interface for manipulating replication queues * @param replicationPeers * @param replicationTracker * @param conf the configuration to use * @param server the server for this region server * @param fs the file system to use * @param logDir the directory that contains all wal directories of live RSs * @param oldLogDir the directory where old logs are archived * @param clusterId */ public ReplicationSourceManager(final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker, final Configuration conf, final Server server, final FileSystem fs, final Path logDir, final Path oldLogDir, final UUID clusterId) { //CopyOnWriteArrayList is thread-safe. //Generally, reading is more than modifying. this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>(); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.replicationTracker = replicationTracker; this.server = server; this.walsById = new HashMap<String, Map<String, SortedSet<String>>>(); this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, Map<String, SortedSet<String>>>(); this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>(); this.conf = conf; this.fs = fs; this.logDir = logDir; this.oldLogDir = oldLogDir; this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds this.clusterId = clusterId; this.replicationTracker.registerListener(this); this.replicationPeers.getAllPeerIds(); // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. int nbWorkers = conf.getInt("replication.executor.workers", 1); // use a short 100ms sleep since this could be done inline with a RS startup // even if we fail, other region servers can take care of it this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setNameFormat("ReplicationExecutor-%d"); tfb.setDaemon(true); this.executor.setThreadFactory(tfb.build()); this.rand = new Random(); this.latestPaths = Collections.synchronizedSet(new HashSet<Path>()); }
/** * * @param rsZnode */ public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final UUID clusterId) { super("Failover-for-"+rsZnode); this.rsZnode = rsZnode; this.rq = replicationQueues; this.rp = replicationPeers; this.clusterId = clusterId; }
/** * Instantiation method used by region servers * * @param conf configuration to use * @param fs file system to use * @param manager replication manager to ping to * @param stopper the atomic boolean to use to stop the regionserver * @param peerClusterZnode the name of our znode * @param clusterId unique UUID for the cluster * @param replicationEndpoint the replication endpoint implementation * @param metrics metrics for replication source * @throws IOException */ @Override public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint, final MetricsSource metrics) throws IOException { this.stopper = stopper; this.conf = HBaseConfiguration.create(conf); decorateConf(); this.replicationQueueSizeCapacity = this.conf.getLong("replication.source.size.capacity", 1024*1024*64); this.replicationQueueNbCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); this.throttler = new ReplicationThrottler((double)bandwidth/10.0); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.manager = manager; this.fs = fs; this.metrics = metrics; this.clusterId = clusterId; this.peerClusterZnode = peerClusterZnode; this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); this.replicationEndpoint = replicationEndpoint; }
@Test public void testCleanupFailoverQueues() throws Exception { final Server server = new DummyServer("hostname1.example.org"); ReplicationQueues rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), server); rq.init(server.getServerName().toString()); // populate some znodes in the peer znode SortedSet<String> files = new TreeSet<String>(); String group = "testgroup"; String file1 = group + ".log1"; String file2 = group + ".log2"; files.add(file1); files.add(file2); for (String file : files) { rq.addLog("1", file); } Server s1 = new DummyServer("dummyserver1.example.org"); ReplicationQueues rq1 = ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); rq1.init(s1.getServerName().toString()); ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1); rp1.init(); NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID( new Long(1), new Long(2))); w1.start(); w1.join(5000); assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); String id = "1-" + server.getServerName().getServerName(); assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); manager.cleanOldLogs(file2, id, true); // log1 should be deleted assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); }
@Test public void testFailoverDeadServerCversionChange() throws Exception { LOG.debug("testFailoverDeadServerCversionChange"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server s0 = new DummyServer("cversion-change0.example.org"); ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0); repQueues.init(s0.getServerName().toString()); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); for (String file : files) { repQueues.addLog("1", file); } // simulate queue transfer Server s1 = new DummyServer("cversion-change1.example.org"); ReplicationQueues rq1 = ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); rq1.init(s1.getServerName().toString()); ReplicationQueuesClient client = ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1); int v0 = client.getQueuesZNodeCversion(); rq1.claimQueues(s0.getServerName().getServerName()); int v1 = client.getQueuesZNodeCversion(); // cversion should increased by 1 since a child node is deleted assertEquals(v0 + 1, v1); s0.abort("", null); }
/** * Creates a replication manager and sets the watch on all the other registered region servers * @param replicationQueues the interface for manipulating replication queues * @param replicationPeers * @param replicationTracker * @param conf the configuration to use * @param server the server for this region server * @param fs the file system to use * @param logDir the directory that contains all wal directories of live RSs * @param oldLogDir the directory where old logs are archived * @param clusterId */ public ReplicationSourceManager(final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker, final Configuration conf, final Server server, final FileSystem fs, final Path logDir, final Path oldLogDir, final UUID clusterId) { //CopyOnWriteArrayList is thread-safe. //Generally, reading is more than modifying. this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>(); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.replicationTracker = replicationTracker; this.server = server; this.walsById = new HashMap<String, SortedSet<String>>(); this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>(); this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>(); this.conf = conf; this.fs = fs; this.logDir = logDir; this.oldLogDir = oldLogDir; this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds this.clusterId = clusterId; this.replicationTracker.registerListener(this); this.replicationPeers.getAllPeerIds(); // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. int nbWorkers = conf.getInt("replication.executor.workers", 1); // use a short 100ms sleep since this could be done inline with a RS startup // even if we fail, other region servers can take care of it this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setNameFormat("ReplicationExecutor-%d"); tfb.setDaemon(true); this.executor.setThreadFactory(tfb.build()); this.rand = new Random(); }
@Test public void testCleanupFailoverQueues() throws Exception { final Server server = new DummyServer("hostname1.example.org"); ReplicationQueues rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), server); rq.init(server.getServerName().toString()); // populate some znodes in the peer znode SortedSet<String> files = new TreeSet<String>(); files.add("log1"); files.add("log2"); for (String file : files) { rq.addLog("1", file); } Server s1 = new DummyServer("dummyserver1.example.org"); ReplicationQueues rq1 = ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); rq1.init(s1.getServerName().toString()); ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1); rp1.init(); NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID( new Long(1), new Long(2))); w1.start(); w1.join(5000); assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); String id = "1-" + server.getServerName().getServerName(); assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id)); manager.cleanOldLogs("log2", id, true); // log1 should be deleted assertEquals(Sets.newHashSet("log2"), manager.getWalsByIdRecoveredQueues().get(id)); }
/** * Creates a replication manager and sets the watch on all the other registered region servers * @param replicationQueues the interface for manipulating replication queues * @param replicationPeers * @param replicationTracker * @param conf the configuration to use * @param stopper the stopper object for this region server * @param fs the file system to use * @param logDir the directory that contains all hlog directories of live RSs * @param oldLogDir the directory where old logs are archived * @param clusterId */ public ReplicationSourceManager(final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker, final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir, final Path oldLogDir, final UUID clusterId) { this.sources = new ArrayList<ReplicationSourceInterface>(); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.replicationTracker = replicationTracker; this.stopper = stopper; this.hlogsById = new HashMap<String, SortedSet<String>>(); this.oldsources = new ArrayList<ReplicationSourceInterface>(); this.conf = conf; this.fs = fs; this.logDir = logDir; this.oldLogDir = oldLogDir; this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000); this.clusterId = clusterId; this.replicationTracker.registerListener(this); this.replicationPeers.getAllPeerIds(); // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. int nbWorkers = conf.getInt("replication.executor.workers", 1); // use a short 100ms sleep since this could be done inline with a RS startup // even if we fail, other region servers can take care of it this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setNameFormat("ReplicationExecutor-%d"); this.executor.setThreadFactory(tfb.build()); this.rand = new Random(); }
/** * Instantiation method used by region servers * * @param conf configuration to use * @param fs file system to use * @param manager replication manager to ping to * @param stopper the atomic boolean to use to stop the regionserver * @param peerClusterZnode the name of our znode * @throws IOException */ public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerClusterZnode, final UUID clusterId) throws IOException { this.stopper = stopper; this.conf = conf; this.replicationQueueSizeCapacity = this.conf.getLong("replication.source.size.capacity", 1024*1024*64); this.replicationQueueNbCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier * maxRetriesMultiplier); this.queue = new PriorityBlockingQueue<Path>( conf.getInt("hbase.regionserver.maxlogs", 32), new LogsComparator()); // TODO: This connection is replication specific or we should make it particular to // replication and make replication specific settings such as compression or codec to use // passing Cells. this.conn = HConnectionManager.getConnection(conf); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.manager = manager; this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.fs = fs; this.metrics = new MetricsSource(peerClusterZnode); this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf); this.clusterId = clusterId; this.peerClusterZnode = peerClusterZnode; this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, conf); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); }
@Test public void testClaimQueues() throws Exception { LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("hostname0.example.org"); ReplicationQueues rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), server); rq.init(server.getServerName().toString()); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); for (String file : files) { rq.addLog("1", file); } // create 3 DummyServers Server s1 = new DummyServer("dummyserver1.example.org"); Server s2 = new DummyServer("dummyserver2.example.org"); Server s3 = new DummyServer("dummyserver3.example.org"); // create 3 DummyNodeFailoverWorkers DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker( server.getServerName().getServerName(), s1); DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker( server.getServerName().getServerName(), s2); DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker( server.getServerName().getServerName(), s3); latch = new CountDownLatch(3); // start the threads w1.start(); w2.start(); w3.start(); // make sure only one is successful int populatedMap = 0; // wait for result now... till all the workers are done. latch.await(); populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated() + w3.isLogZnodesMapPopulated(); assertEquals(1, populatedMap); server.abort("", null); }
@Test public void testNodeFailoverDeadServerParsing() throws Exception { LOG.debug("testNodeFailoverDeadServerParsing"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); repQueues.init(server.getServerName().toString()); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); for (String file : files) { repQueues.addLog("1", file); } // create 3 DummyServers Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal"); Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com"); Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com"); // simulate three servers fail sequentially ReplicationQueues rq1 = ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); rq1.init(s1.getServerName().toString()); SortedMap<String, SortedSet<String>> testMap = rq1.claimQueues(server.getServerName().getServerName()); ReplicationQueues rq2 = ReplicationFactory.getReplicationQueues(s2.getZooKeeper(), s2.getConfiguration(), s2); rq2.init(s2.getServerName().toString()); testMap = rq2.claimQueues(s1.getServerName().getServerName()); ReplicationQueues rq3 = ReplicationFactory.getReplicationQueues(s3.getZooKeeper(), s3.getConfiguration(), s3); rq3.init(s3.getServerName().toString()); testMap = rq3.claimQueues(s2.getServerName().getServerName()); ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey()); List<String> result = replicationQueueInfo.getDeadRegionServers(); // verify assertTrue(result.contains(server.getServerName().getServerName())); assertTrue(result.contains(s1.getServerName().getServerName())); assertTrue(result.contains(s2.getServerName().getServerName())); server.abort("", null); }
@Test public void testLogCleaning() throws Exception{ Configuration conf = TEST_UTIL.getConfiguration(); // set TTL long ttl = 10000; conf.setLong("hbase.master.logcleaner.ttl", ttl); conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); Replication.decorateMasterConfiguration(conf); Server server = new DummyServer(); ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); repQueues.init(server.getServerName().toString()); final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); String fakeMachineName = URLEncoder.encode(server.getServerName().toString(), "UTF8"); final FileSystem fs = FileSystem.get(conf); // Create 2 invalid files, 1 "recent" file, 1 very new file and 30 old files long now = System.currentTimeMillis(); fs.delete(oldLogDir, true); fs.mkdirs(oldLogDir); // Case 1: 2 invalid files, which would be deleted directly fs.createNewFile(new Path(oldLogDir, "a")); fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + "a")); // Case 2: 1 "recent" file, not even deletable for the first log cleaner // (TimeToLiveLogCleaner), so we are not going down the chain System.out.println("Now is: " + now); for (int i = 1; i < 31; i++) { // Case 3: old files which would be deletable for the first log cleaner // (TimeToLiveLogCleaner), and also for the second (ReplicationLogCleaner) Path fileName = new Path(oldLogDir, fakeMachineName + "." + (now - i) ); fs.createNewFile(fileName); // Case 4: put 3 old log files in ZK indicating that they are scheduled // for replication so these files would pass the first log cleaner // (TimeToLiveLogCleaner) but would be rejected by the second // (ReplicationLogCleaner) if (i % (30/3) == 1) { repQueues.addLog(fakeMachineName, fileName.getName()); System.out.println("Replication log file: " + fileName); } } // sleep for sometime to get newer modifcation time Thread.sleep(ttl); fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + now)); // Case 2: 1 newer file, not even deletable for the first log cleaner // (TimeToLiveLogCleaner), so we are not going down the chain fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now + 10000) )); for (FileStatus stat : fs.listStatus(oldLogDir)) { System.out.println(stat.getPath().toString()); } assertEquals(34, fs.listStatus(oldLogDir).length); LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir); cleaner.chore(); // We end up with the current log file, a newer one and the 3 old log // files which are scheduled for replication TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { return 5 == fs.listStatus(oldLogDir).length; } }); for (FileStatus file : fs.listStatus(oldLogDir)) { System.out.println("Kept log files: " + file.getPath().getName()); } }
/** * Instantiation method used by region servers * * @param conf configuration to use * @param fs file system to use * @param manager replication manager to ping to * @param stopper the atomic boolean to use to stop the regionserver * @param peerClusterZnode the name of our znode * @param clusterId unique UUID for the cluster * @param replicationEndpoint the replication endpoint implementation * @param metrics metrics for replication source * @throws IOException */ @Override public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint, final MetricsSource metrics) throws IOException { this.stopper = stopper; this.conf = HBaseConfiguration.create(conf); decorateConf(); this.replicationQueueSizeCapacity = this.conf.getLong("replication.source.size.capacity", 1024*1024*64); this.replicationQueueNbCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.queue = new PriorityBlockingQueue<Path>( this.conf.getInt("hbase.regionserver.maxlogs", 32), new LogsComparator()); long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); this.throttler = new ReplicationThrottler((double)bandwidth/10.0); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.manager = manager; this.fs = fs; this.metrics = metrics; this.repLogReader = new ReplicationWALReaderManager(this.fs, this.conf); this.clusterId = clusterId; this.peerClusterZnode = peerClusterZnode; this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); this.replicationEndpoint = replicationEndpoint; this.replicateContext = new ReplicationEndpoint.ReplicateContext(); }
/** * Instantiation method used by region servers * * @param conf configuration to use * @param fs file system to use * @param manager replication manager to ping to * @param stopper the atomic boolean to use to stop the regionserver * @param peerClusterZnode the name of our znode * @throws IOException */ public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerClusterZnode, final UUID clusterId) throws IOException { this.stopper = stopper; this.conf = HBaseConfiguration.create(conf); decorateConf(); this.replicationQueueSizeCapacity = this.conf.getLong("replication.source.size.capacity", 1024*1024*64); this.replicationQueueNbCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier * maxRetriesMultiplier); this.queue = new PriorityBlockingQueue<Path>( this.conf.getInt("hbase.regionserver.maxlogs", 32), new LogsComparator()); // TODO: This connection is replication specific or we should make it particular to // replication and make replication specific settings such as compression or codec to use // passing Cells. this.conn = HConnectionManager.getConnection(this.conf); long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); this.throttler = new ReplicationThrottler((double)bandwidth/10.0); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.manager = manager; this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.fs = fs; this.metrics = new MetricsSource(peerClusterZnode); this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf); this.clusterId = clusterId; this.peerClusterZnode = peerClusterZnode; this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, this.conf); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); }
@Test public void testLogCleaning() throws Exception{ Configuration conf = TEST_UTIL.getConfiguration(); // set TTL long ttl = 10000; conf.setLong("hbase.master.logcleaner.ttl", ttl); conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); Replication.decorateMasterConfiguration(conf); Server server = new DummyServer(); ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); repQueues.init(server.getServerName().toString()); Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); String fakeMachineName = URLEncoder.encode(server.getServerName().toString(), "UTF8"); FileSystem fs = FileSystem.get(conf); LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir); // Create 2 invalid files, 1 "recent" file, 1 very new file and 30 old files long now = System.currentTimeMillis(); fs.delete(oldLogDir, true); fs.mkdirs(oldLogDir); // Case 1: 2 invalid files, which would be deleted directly fs.createNewFile(new Path(oldLogDir, "a")); fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + "a")); // Case 2: 1 "recent" file, not even deletable for the first log cleaner // (TimeToLiveLogCleaner), so we are not going down the chain System.out.println("Now is: " + now); for (int i = 1; i < 31; i++) { // Case 3: old files which would be deletable for the first log cleaner // (TimeToLiveLogCleaner), and also for the second (ReplicationLogCleaner) Path fileName = new Path(oldLogDir, fakeMachineName + "." + (now - i) ); fs.createNewFile(fileName); // Case 4: put 3 old log files in ZK indicating that they are scheduled // for replication so these files would pass the first log cleaner // (TimeToLiveLogCleaner) but would be rejected by the second // (ReplicationLogCleaner) if (i % (30/3) == 1) { repQueues.addLog(fakeMachineName, fileName.getName()); System.out.println("Replication log file: " + fileName); } } // sleep for sometime to get newer modifcation time Thread.sleep(ttl); fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + now)); // Case 2: 1 newer file, not even deletable for the first log cleaner // (TimeToLiveLogCleaner), so we are not going down the chain fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now + 10000) )); for (FileStatus stat : fs.listStatus(oldLogDir)) { System.out.println(stat.getPath().toString()); } assertEquals(34, fs.listStatus(oldLogDir).length); cleaner.chore(); // We end up with the current log file, a newer one and the 3 old log // files which are scheduled for replication assertEquals(5, fs.listStatus(oldLogDir).length); for (FileStatus file : fs.listStatus(oldLogDir)) { System.out.println("Kept log files: " + file.getPath().getName()); } }
/** * Initializer for the source * @param conf the configuration to use * @param fs the file system to use * @param manager the manager to use * @param replicationQueues * @param replicationPeers * @param stopper the stopper object for this region server * @param peerClusterZnode * @param clusterId * @throws IOException */ public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint, final MetricsSource metrics) throws IOException;
/** * Initializer for the source * @param conf the configuration to use * @param fs the file system to use * @param manager the manager to use * @param replicationQueues * @param replicationPeers * @param stopper the stopper object for this region server * @param peerClusterZnode * @param clusterId * @throws IOException */ public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerClusterZnode, final UUID clusterId) throws IOException;