/** * Creates a replication manager and sets the watch on all the other registered region servers * @param replicationQueues the interface for manipulating replication queues * @param replicationPeers * @param replicationTracker * @param conf the configuration to use * @param server the server for this region server * @param fs the file system to use * @param logDir the directory that contains all wal directories of live RSs * @param oldLogDir the directory where old logs are archived * @param clusterId */ public ReplicationSourceManager(final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker, final Configuration conf, final Server server, final FileSystem fs, final Path logDir, final Path oldLogDir, final UUID clusterId) { //CopyOnWriteArrayList is thread-safe. //Generally, reading is more than modifying. this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>(); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.replicationTracker = replicationTracker; this.server = server; this.walsById = new HashMap<String, Map<String, SortedSet<String>>>(); this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, Map<String, SortedSet<String>>>(); this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>(); this.conf = conf; this.fs = fs; this.logDir = logDir; this.oldLogDir = oldLogDir; this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds this.clusterId = clusterId; this.replicationTracker.registerListener(this); this.replicationPeers.getAllPeerIds(); // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. int nbWorkers = conf.getInt("replication.executor.workers", 1); // use a short 100ms sleep since this could be done inline with a RS startup // even if we fail, other region servers can take care of it this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setNameFormat("ReplicationExecutor-%d"); tfb.setDaemon(true); this.executor.setThreadFactory(tfb.build()); this.rand = new Random(); this.latestPaths = Collections.synchronizedSet(new HashSet<Path>()); }
/** * Creates a replication manager and sets the watch on all the other registered region servers * @param replicationQueues the interface for manipulating replication queues * @param replicationPeers * @param replicationTracker * @param conf the configuration to use * @param server the server for this region server * @param fs the file system to use * @param logDir the directory that contains all wal directories of live RSs * @param oldLogDir the directory where old logs are archived * @param clusterId */ public ReplicationSourceManager(final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker, final Configuration conf, final Server server, final FileSystem fs, final Path logDir, final Path oldLogDir, final UUID clusterId) { //CopyOnWriteArrayList is thread-safe. //Generally, reading is more than modifying. this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>(); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.replicationTracker = replicationTracker; this.server = server; this.walsById = new HashMap<String, SortedSet<String>>(); this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>(); this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>(); this.conf = conf; this.fs = fs; this.logDir = logDir; this.oldLogDir = oldLogDir; this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds this.clusterId = clusterId; this.replicationTracker.registerListener(this); this.replicationPeers.getAllPeerIds(); // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. int nbWorkers = conf.getInt("replication.executor.workers", 1); // use a short 100ms sleep since this could be done inline with a RS startup // even if we fail, other region servers can take care of it this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setNameFormat("ReplicationExecutor-%d"); tfb.setDaemon(true); this.executor.setThreadFactory(tfb.build()); this.rand = new Random(); }
/** * Creates a replication manager and sets the watch on all the other registered region servers * @param replicationQueues the interface for manipulating replication queues * @param replicationPeers * @param replicationTracker * @param conf the configuration to use * @param stopper the stopper object for this region server * @param fs the file system to use * @param logDir the directory that contains all hlog directories of live RSs * @param oldLogDir the directory where old logs are archived * @param clusterId */ public ReplicationSourceManager(final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker, final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir, final Path oldLogDir, final UUID clusterId) { this.sources = new ArrayList<ReplicationSourceInterface>(); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.replicationTracker = replicationTracker; this.stopper = stopper; this.hlogsById = new HashMap<String, SortedSet<String>>(); this.oldsources = new ArrayList<ReplicationSourceInterface>(); this.conf = conf; this.fs = fs; this.logDir = logDir; this.oldLogDir = oldLogDir; this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000); this.clusterId = clusterId; this.replicationTracker.registerListener(this); this.replicationPeers.getAllPeerIds(); // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. int nbWorkers = conf.getInt("replication.executor.workers", 1); // use a short 100ms sleep since this could be done inline with a RS startup // even if we fail, other region servers can take care of it this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setNameFormat("ReplicationExecutor-%d"); this.executor.setThreadFactory(tfb.build()); this.rand = new Random(); }
public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) throws Exception { ReplicationQueueStorage queueStorage; ReplicationTracker replicationTracker; StringBuilder sb = new StringBuilder(); queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); replicationTracker = ReplicationFactory.getReplicationTracker(zkw, new WarnOnlyAbortable(), new WarnOnlyStoppable()); Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers()); // Loops each peer on each RS and dumps the queues List<ServerName> regionservers = queueStorage.getListOfReplicators(); if (regionservers == null || regionservers.isEmpty()) { return sb.toString(); } for (ServerName regionserver : regionservers) { List<String> queueIds = queueStorage.getAllQueues(regionserver); if (!liveRegionServers.contains(regionserver.getServerName())) { deadRegionServers.add(regionserver.getServerName()); } for (String queueId : queueIds) { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId); if (!peerIds.contains(queueInfo.getPeerId())) { deletedQueues.add(regionserver + "/" + queueId); sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs)); } else { sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs)); } } } return sb.toString(); }
/** * Creates a replication manager and sets the watch on all the other registered region servers * @param queueStorage the interface for manipulating replication queues * @param replicationPeers * @param replicationTracker * @param conf the configuration to use * @param server the server for this region server * @param fs the file system to use * @param logDir the directory that contains all wal directories of live RSs * @param oldLogDir the directory where old logs are archived * @param clusterId */ public ReplicationSourceManager(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, WALFileLengthProvider walFileLengthProvider) throws IOException { // CopyOnWriteArrayList is thread-safe. // Generally, reading is more than modifying. this.sources = new ConcurrentHashMap<>(); this.queueStorage = queueStorage; this.replicationPeers = replicationPeers; this.replicationTracker = replicationTracker; this.server = server; this.walsById = new ConcurrentHashMap<>(); this.walsByIdRecoveredQueues = new ConcurrentHashMap<>(); this.oldsources = new ArrayList<>(); this.conf = conf; this.fs = fs; this.logDir = logDir; this.oldLogDir = oldLogDir; this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30 // seconds this.clusterId = clusterId; this.walFileLengthProvider = walFileLengthProvider; this.replicationTracker.registerListener(this); // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. int nbWorkers = conf.getInt("replication.executor.workers", 1); // use a short 100ms sleep since this could be done inline with a RS startup // even if we fail, other region servers can take care of it this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setNameFormat("ReplicationExecutor-%d"); tfb.setDaemon(true); this.executor.setThreadFactory(tfb.build()); this.latestPaths = new HashSet<Path>(); replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); this.replicationWaitTime = conf.getLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY, HConstants.REPLICATION_SERIALLY_WAITING_DEFAULT); connection = ConnectionFactory.createConnection(conf); }