/** * Tests that the peer configuration used by ReplicationAdmin contains all * the peer's properties. */ @Test public void testPeerConfig() throws Exception { ReplicationPeerConfig config = new ReplicationPeerConfig(); config.setClusterKey(KEY_ONE); config.getConfiguration().put("key1", "value1"); config.getConfiguration().put("key2", "value2"); admin.addPeer(ID_ONE, config, null); List<ReplicationPeer> peers = admin.listReplicationPeers(); assertEquals(1, peers.size()); ReplicationPeer peerOne = peers.get(0); assertNotNull(peerOne); assertEquals("value1", peerOne.getConfiguration().get("key1")); assertEquals("value2", peerOne.getConfiguration().get("key2")); admin.removePeer(ID_ONE); }
@VisibleForTesting List<ReplicationPeer> listReplicationPeers() { Map<String, ReplicationPeerConfig> peers = listPeerConfigs(); if (peers == null || peers.size() <= 0) { return null; } List<ReplicationPeer> listOfPeers = new ArrayList<ReplicationPeer>(peers.size()); for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) { String peerId = peerEntry.getKey(); try { Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId); Configuration peerConf = pair.getSecond(); ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(), parseTableCFsFromConfig(this.getPeerTableCFs(peerId))); listOfPeers.add(peer); } catch (ReplicationException e) { LOG.warn("Failed to get valid replication peers. " + "Error connecting to peer cluster with peerId=" + peerId + ". Error message=" + e.getMessage()); LOG.debug("Failure details to get valid replication peers.", e); continue; } } return listOfPeers; }
/** * Add sources for the given peer cluster on this region server. For the newly added peer, we only * need to enqueue the latest log of each wal group and do replication * @param id the id of the peer cluster * @return the source that was created * @throws IOException */ protected ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id); ReplicationPeer peer = replicationPeers.getPeer(id); ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer); synchronized (this.walsById) { this.sources.add(src); Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>(); this.walsById.put(id, walsByGroup); // Add the latest wal to that source's queue synchronized (latestPaths) { if (this.latestPaths.size() > 0) { for (Path logPath : latestPaths) { String name = logPath.getName(); String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name); SortedSet<String> logs = new TreeSet<String>(); logs.add(name); walsByGroup.put(walPrefix, logs); try { this.replicationQueues.addLog(id, name); } catch (ReplicationException e) { String message = "Cannot add log to queue when creating a new source, queueId=" + id + ", filename=" + name; server.stop(message); throw e; } src.enqueueLog(logPath); } } } } src.startup(); return src; }
/** * Add a new normal source to this region server * @param id the id of the peer cluster * @return the source that was created * @throws IOException */ protected ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id); ReplicationPeer peer = replicationPeers.getPeer(id); ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer); synchronized (this.walsById) { this.sources.add(src); this.walsById.put(id, new TreeSet<String>()); // Add the latest wal to that source's queue if (this.latestPath != null) { String name = this.latestPath.getName(); this.walsById.get(id).add(name); try { this.replicationQueues.addLog(src.getPeerClusterZnode(), name); } catch (ReplicationException e) { String message = "Cannot add log to queue when creating a new source, queueId=" + src.getPeerClusterZnode() + ", filename=" + name; server.stop(message); throw e; } src.enqueueLog(this.latestPath); } } src.startup(); return src; }
private static String getPeerQuorumAddress(final Configuration conf) throws IOException { ZooKeeperWatcher localZKW = null; ReplicationPeer peer = null; try { localZKW = new ZooKeeperWatcher(conf, "VerifyReplication", new Abortable() { @Override public void abort(String why, Throwable e) {} @Override public boolean isAborted() {return false;} }); ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW); rp.init(); Configuration peerConf = rp.getPeerConf(peerId); if (peerConf == null) { throw new IOException("Couldn't get peer conf!"); } return ZKUtil.getZooKeeperClusterKey(peerConf); } catch (ReplicationException e) { throw new IOException( "An error occured while trying to connect to the remove peer cluster", e); } finally { if (peer != null) { peer.close(); } if (localZKW != null) { localZKW.close(); } } }
/** * Factory method to create a replication source * @param queueId the id of the replication queue * @return the created source */ private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer) throws IOException { ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId); MetricsSource metrics = new MetricsSource(queueId); // init replication source src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId, walFileLengthProvider, metrics); return src; }
/** * Add a normal source for the given peer on this region server. Meanwhile, add new replication * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal * group and do replication * @param peerId the id of the replication peer * @return the source that was created */ @VisibleForTesting ReplicationSourceInterface addSource(String peerId) throws IOException { ReplicationPeer peer = replicationPeers.getPeer(peerId); ReplicationSourceInterface src = createSource(peerId, peer); // synchronized on latestPaths to avoid missing the new log synchronized (this.latestPaths) { this.sources.put(peerId, src); Map<String, SortedSet<String>> walsByGroup = new HashMap<>(); this.walsById.put(peerId, walsByGroup); // Add the latest wal to that source's queue if (this.latestPaths.size() > 0) { for (Path logPath : latestPaths) { String name = logPath.getName(); String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name); SortedSet<String> logs = new TreeSet<>(); logs.add(name); walsByGroup.put(walPrefix, logs); // Abort RS and throw exception to make add peer failed abortAndThrowIOExceptionWhenFail( () -> this.queueStorage.addWAL(server.getServerName(), peerId, name)); src.enqueueLog(logPath); } } } src.startup(); return src; }
@Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode, clusterId, walFileLengthProvider, metrics); this.actualPeerId = this.replicationQueueInfo.getPeerId(); }
/** * Instantiation method used by region servers * @param conf configuration to use * @param fs file system to use * @param manager replication manager to ping to * @param server the server for this region server * @param queueId the id of our replication queue * @param clusterId unique UUID for the cluster * @param metrics metrics for replication source */ @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { this.server = server; this.conf = HBaseConfiguration.create(conf); this.waitOnEndpointSeconds = this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); decorateConf(); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); this.queueStorage = queueStorage; this.replicationPeer = replicationPeer; this.manager = manager; this.fs = fs; this.metrics = metrics; this.clusterId = clusterId; this.queueId = queueId; this.replicationQueueInfo = new ReplicationQueueInfo(queueId); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); currentBandwidth = getCurrentBandwidth(); this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); this.totalBufferUsed = manager.getTotalBufferUsed(); this.walFileLengthProvider = walFileLengthProvider; LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId + ", currentBandwidth=" + this.currentBandwidth); }
@Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { throw new IOException("Failing deliberately"); }
/** * Connect to peer and check the table descriptor on peer: * <ol> * <li>Create the same table on peer when not exist.</li> * <li>Throw exception if the table exists on peer cluster but descriptors are not same.</li> * </ol> * @param tableName name of the table to sync to the peer * @param splits table split keys * @throws IOException */ private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits) throws IOException { List<ReplicationPeer> repPeers = listReplicationPeers(); if (repPeers == null || repPeers.size() <= 0) { throw new IllegalArgumentException("Found no peer cluster for replication."); } final TableName onlyTableNameQualifier = TableName.valueOf(tableName.getQualifierAsString()); for (ReplicationPeer repPeer : repPeers) { Map<TableName, List<String>> tableCFMap = repPeer.getTableCFs(); // TODO Currently peer TableCFs will not include namespace so we need to check only for table // name without namespace in it. Need to correct this logic once we fix HBASE-11386. if (tableCFMap != null && !tableCFMap.containsKey(onlyTableNameQualifier)) { continue; } Configuration peerConf = repPeer.getConfiguration(); HTableDescriptor htd = null; try (Connection conn = ConnectionFactory.createConnection(peerConf); Admin admin = this.connection.getAdmin(); Admin repHBaseAdmin = conn.getAdmin()) { htd = admin.getTableDescriptor(tableName); HTableDescriptor peerHtd = null; if (!repHBaseAdmin.tableExists(tableName)) { repHBaseAdmin.createTable(htd, splits); } else { peerHtd = repHBaseAdmin.getTableDescriptor(tableName); if (peerHtd == null) { throw new IllegalArgumentException("Failed to get table descriptor for table " + tableName.getNameAsString() + " from peer cluster " + repPeer.getId()); } else if (!peerHtd.equals(htd)) { throw new IllegalArgumentException("Table " + tableName.getNameAsString() + " exists in peer cluster " + repPeer.getId() + ", but the table descriptors are not same when comapred with source cluster." + " Thus can not enable the table's replication switch."); } } } } }
/** * Sets up the actual job. * * @param conf The current configuration. * @param args The command line parameters. * @return The newly created job. * @throws java.io.IOException When setting up the job fails. */ public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException { if (!doCommandLine(args)) { return null; } if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) { throw new IOException("Replication needs to be enabled to verify it."); } HConnectionManager.execute(new HConnectable<Void>(conf) { @Override public Void connect(HConnection conn) throws IOException { try { ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf, conn.getZooKeeperWatcher()); // Just verifying it we can connect ReplicationPeer peer = zk.getPeer(peerId); if (peer == null) { throw new IOException("Couldn't get access to the slave cluster," + "please see the log"); } } catch (KeeperException ex) { throw new IOException("Couldn't get access to the slave cluster" + " because: ", ex); } return null; } }); conf.set(NAME+".peerId", peerId); conf.set(NAME+".tableName", tableName); conf.setLong(NAME+".startTime", startTime); conf.setLong(NAME+".endTime", endTime); if (families != null) { conf.set(NAME+".families", families); } Job job = new Job(conf, NAME + "_" + tableName); job.setJarByClass(VerifyReplication.class); Scan scan = new Scan(); if (startTime != 0) { scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); } if(families != null) { String[] fams = families.split(","); for(String fam : fams) { scan.addFamily(Bytes.toBytes(fam)); } } TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); job.setOutputFormatClass(NullOutputFormat.class); job.setNumReduceTasks(0); return job; }
/** * Initializer for the source * @param conf the configuration to use * @param fs the file system to use * @param manager the manager to use * @param server the server for this region server */ void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;