static ReplicationSourceInterface create(Configuration conf, String queueId) { ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered(); ReplicationSourceInterface src; try { String defaultReplicationSourceImpl = isQueueRecovered ? RecoveredReplicationSource.class.getCanonicalName() : ReplicationSource.class.getCanonicalName(); @SuppressWarnings("rawtypes") Class c = Class.forName( conf.get("replication.replicationsource.implementation", defaultReplicationSourceImpl)); src = (ReplicationSourceInterface) c.newInstance(); } catch (Exception e) { LOG.warn("Passed replication source implementation throws errors, " + "defaulting to ReplicationSource", e); src = isQueueRecovered ? new RecoveredReplicationSource() : new ReplicationSource(); } return src; }
private void checkQueuesDeleted(String peerId) throws ReplicationException, DoNotRetryIOException { for (ServerName replicator : queueStorage.getListOfReplicators()) { List<String> queueIds = queueStorage.getAllQueues(replicator); for (String queueId : queueIds) { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); if (queueInfo.getPeerId().equals(peerId)) { throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId + ", replicator: " + replicator + ", queueId: " + queueId); } } } if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) { throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs"); } }
private Map<ServerName, List<String>> getUnDeletedQueues() throws ReplicationException { Map<ServerName, List<String>> undeletedQueues = new HashMap<>(); Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds()); for (ServerName replicator : queueStorage.getListOfReplicators()) { for (String queueId : queueStorage.getAllQueues(replicator)) { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); if (!peerIds.contains(queueInfo.getPeerId())) { undeletedQueues.computeIfAbsent(replicator, key -> new ArrayList<>()).add(queueId); LOG.debug( "Undeleted replication queue for removed peer found: " + "[removedPeerId={}, replicator={}, queueId={}]", queueInfo.getPeerId(), replicator, queueId); } } } return undeletedQueues; }
public void checkUnDeletedQueues() throws ReplicationException { undeletedQueueIds = getUnDeletedQueues(); undeletedQueueIds.forEach((replicator, queueIds) -> { queueIds.forEach(queueId -> { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); String msg = "Undeleted replication queue for removed peer found: " + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(), replicator, queueId); errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg); }); }); undeletedHFileRefsPeerIds = getUndeletedHFileRefsPeers(); undeletedHFileRefsPeerIds.stream() .map( peerId -> "Undeleted replication hfile-refs queue for removed peer " + peerId + " found") .forEach(msg -> errorReporter .reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg)); }
/** * Instantiation method used by region servers * * @param conf configuration to use * @param fs file system to use * @param manager replication manager to ping to * @param stopper the atomic boolean to use to stop the regionserver * @param peerClusterZnode the name of our znode * @param clusterId unique UUID for the cluster * @param replicationEndpoint the replication endpoint implementation * @param metrics metrics for replication source * @throws IOException */ @Override public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint, final MetricsSource metrics) throws IOException { this.stopper = stopper; this.conf = HBaseConfiguration.create(conf); decorateConf(); this.replicationQueueSizeCapacity = this.conf.getLong("replication.source.size.capacity", 1024*1024*64); this.replicationQueueNbCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); this.throttler = new ReplicationThrottler((double)bandwidth/10.0); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.manager = manager; this.fs = fs; this.metrics = metrics; this.clusterId = clusterId; this.peerClusterZnode = peerClusterZnode; this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); this.replicationEndpoint = replicationEndpoint; }
public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) { this.walGroupId = walGroupId; this.queue = queue; this.replicationQueueInfo = replicationQueueInfo; this.repLogReader = new ReplicationWALReaderManager(fs, conf); this.source = source; }
public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) throws Exception { ReplicationQueueStorage queueStorage; ReplicationTracker replicationTracker; StringBuilder sb = new StringBuilder(); queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); replicationTracker = ReplicationFactory.getReplicationTracker(zkw, new WarnOnlyAbortable(), new WarnOnlyStoppable()); Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers()); // Loops each peer on each RS and dumps the queues List<ServerName> regionservers = queueStorage.getListOfReplicators(); if (regionservers == null || regionservers.isEmpty()) { return sb.toString(); } for (ServerName regionserver : regionservers) { List<String> queueIds = queueStorage.getAllQueues(regionserver); if (!liveRegionServers.contains(regionserver.getServerName())) { deadRegionServers.add(regionserver.getServerName()); } for (String queueId : queueIds) { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId); if (!peerIds.contains(queueInfo.getPeerId())) { deletedQueues.add(regionserver + "/" + queueId); sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs)); } else { sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs)); } } } return sb.toString(); }
private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage, ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted, boolean hdfs) throws Exception { StringBuilder sb = new StringBuilder(); List<ServerName> deadServers; sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n"); sb.append(" Queue znode: " + queueId + "\n"); sb.append(" PeerID: " + queueInfo.getPeerId() + "\n"); sb.append(" Recovered: " + queueInfo.isQueueRecovered() + "\n"); deadServers = queueInfo.getDeadRegionServers(); if (deadServers.isEmpty()) { sb.append(" No dead RegionServers found in this queue." + "\n"); } else { sb.append(" Dead RegionServers: " + deadServers + "\n"); } sb.append(" Was deleted: " + isDeleted + "\n"); sb.append(" Number of WALs in replication queue: " + wals.size() + "\n"); peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size()); for (String wal : wals) { long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal); sb.append(" Replication position for " + wal + ": " + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n"); } if (hdfs) { FileSystem fs = FileSystem.get(getConf()); sb.append(" Total size of WALs on HDFS for this queue: " + StringUtils.humanSize(getTotalWALSize(fs, wals, regionserver)) + "\n"); } return sb.toString(); }
/** * Instantiation method used by region servers * @param conf configuration to use * @param fs file system to use * @param manager replication manager to ping to * @param server the server for this region server * @param queueId the id of our replication queue * @param clusterId unique UUID for the cluster * @param metrics metrics for replication source */ @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { this.server = server; this.conf = HBaseConfiguration.create(conf); this.waitOnEndpointSeconds = this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); decorateConf(); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); this.queueStorage = queueStorage; this.replicationPeer = replicationPeer; this.manager = manager; this.fs = fs; this.metrics = metrics; this.clusterId = clusterId; this.queueId = queueId; this.replicationQueueInfo = new ReplicationQueueInfo(queueId); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); currentBandwidth = getCurrentBandwidth(); this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); this.totalBufferUsed = manager.getTotalBufferUsed(); this.walFileLengthProvider = walFileLengthProvider; LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId + ", currentBandwidth=" + this.currentBandwidth); }
/** * Instantiation method used by region servers * * @param conf configuration to use * @param fs file system to use * @param manager replication manager to ping to * @param stopper the atomic boolean to use to stop the regionserver * @param peerClusterZnode the name of our znode * @throws IOException */ public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerClusterZnode, final UUID clusterId) throws IOException { this.stopper = stopper; this.conf = conf; this.replicationQueueSizeCapacity = this.conf.getLong("replication.source.size.capacity", 1024*1024*64); this.replicationQueueNbCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier * maxRetriesMultiplier); this.queue = new PriorityBlockingQueue<Path>( conf.getInt("hbase.regionserver.maxlogs", 32), new LogsComparator()); // TODO: This connection is replication specific or we should make it particular to // replication and make replication specific settings such as compression or codec to use // passing Cells. this.conn = HConnectionManager.getConnection(conf); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.manager = manager; this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.fs = fs; this.metrics = new MetricsSource(peerClusterZnode); this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf); this.clusterId = clusterId; this.peerClusterZnode = peerClusterZnode; this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, conf); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); }
@Test public void testNodeFailoverDeadServerParsing() throws Exception { LOG.debug("testNodeFailoverDeadServerParsing"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); repQueues.init(server.getServerName().toString()); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); for (String file : files) { repQueues.addLog("1", file); } // create 3 DummyServers Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal"); Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com"); Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com"); // simulate three servers fail sequentially ReplicationQueues rq1 = ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); rq1.init(s1.getServerName().toString()); SortedMap<String, SortedSet<String>> testMap = rq1.claimQueues(server.getServerName().getServerName()); ReplicationQueues rq2 = ReplicationFactory.getReplicationQueues(s2.getZooKeeper(), s2.getConfiguration(), s2); rq2.init(s2.getServerName().toString()); testMap = rq2.claimQueues(s1.getServerName().getServerName()); ReplicationQueues rq3 = ReplicationFactory.getReplicationQueues(s3.getZooKeeper(), s3.getConfiguration(), s3); rq3.init(s3.getServerName().toString()); testMap = rq3.claimQueues(s2.getServerName().getServerName()); ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey()); List<String> result = replicationQueueInfo.getDeadRegionServers(); // verify assertTrue(result.contains(server.getServerName().getServerName())); assertTrue(result.contains(s1.getServerName().getServerName())); assertTrue(result.contains(s2.getServerName().getServerName())); server.abort("", null); }
/** * Instantiation method used by region servers * * @param conf configuration to use * @param fs file system to use * @param manager replication manager to ping to * @param stopper the atomic boolean to use to stop the regionserver * @param peerClusterZnode the name of our znode * @param clusterId unique UUID for the cluster * @param replicationEndpoint the replication endpoint implementation * @param metrics metrics for replication source * @throws IOException */ @Override public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint, final MetricsSource metrics) throws IOException { this.stopper = stopper; this.conf = HBaseConfiguration.create(conf); decorateConf(); this.replicationQueueSizeCapacity = this.conf.getLong("replication.source.size.capacity", 1024*1024*64); this.replicationQueueNbCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.queue = new PriorityBlockingQueue<Path>( this.conf.getInt("hbase.regionserver.maxlogs", 32), new LogsComparator()); long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); this.throttler = new ReplicationThrottler((double)bandwidth/10.0); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.manager = manager; this.fs = fs; this.metrics = metrics; this.repLogReader = new ReplicationWALReaderManager(this.fs, this.conf); this.clusterId = clusterId; this.peerClusterZnode = peerClusterZnode; this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); this.replicationEndpoint = replicationEndpoint; this.replicateContext = new ReplicationEndpoint.ReplicateContext(); }
/** * Instantiation method used by region servers * * @param conf configuration to use * @param fs file system to use * @param manager replication manager to ping to * @param stopper the atomic boolean to use to stop the regionserver * @param peerClusterZnode the name of our znode * @throws IOException */ public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerClusterZnode, final UUID clusterId) throws IOException { this.stopper = stopper; this.conf = HBaseConfiguration.create(conf); decorateConf(); this.replicationQueueSizeCapacity = this.conf.getLong("replication.source.size.capacity", 1024*1024*64); this.replicationQueueNbCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier * maxRetriesMultiplier); this.queue = new PriorityBlockingQueue<Path>( this.conf.getInt("hbase.regionserver.maxlogs", 32), new LogsComparator()); // TODO: This connection is replication specific or we should make it particular to // replication and make replication specific settings such as compression or codec to use // passing Cells. this.conn = HConnectionManager.getConnection(this.conf); long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); this.throttler = new ReplicationThrottler((double)bandwidth/10.0); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.manager = manager; this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.fs = fs; this.metrics = new MetricsSource(peerClusterZnode); this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf); this.clusterId = clusterId; this.peerClusterZnode = peerClusterZnode; this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, this.conf); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); }
@Test public void testNodeFailoverDeadServerParsing() throws Exception { Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); ReplicationQueueStorage queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); for (String file : files) { queueStorage.addWAL(server.getServerName(), "1", file); } // create 3 DummyServers Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal"); Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com"); Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com"); // simulate three servers fail sequentially ServerName serverName = server.getServerName(); List<String> unclaimed = queueStorage.getAllQueues(serverName); queueStorage.claimQueue(serverName, unclaimed.get(0), s1.getServerName()); queueStorage.removeReplicatorIfQueueIsEmpty(serverName); serverName = s1.getServerName(); unclaimed = queueStorage.getAllQueues(serverName); queueStorage.claimQueue(serverName, unclaimed.get(0), s2.getServerName()); queueStorage.removeReplicatorIfQueueIsEmpty(serverName); serverName = s2.getServerName(); unclaimed = queueStorage.getAllQueues(serverName); String queue3 = queueStorage.claimQueue(serverName, unclaimed.get(0), s3.getServerName()).getFirst(); queueStorage.removeReplicatorIfQueueIsEmpty(serverName); ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3); List<ServerName> result = replicationQueueInfo.getDeadRegionServers(); // verify assertTrue(result.contains(server.getServerName())); assertTrue(result.contains(s1.getServerName())); assertTrue(result.contains(s2.getServerName())); server.stop(""); }