/** * Create replication peer for replicating to region replicas if needed. * @param conf configuration to use * @throws IOException */ public static void setupRegionReplicaReplication(Configuration conf) throws IOException { if (!isRegionReplicaReplicationEnabled(conf)) { return; } ReplicationAdmin repAdmin = new ReplicationAdmin(conf); try { if (repAdmin.getPeerConfig(REGION_REPLICA_REPLICATION_PEER) == null) { ReplicationPeerConfig peerConfig = new ReplicationPeerConfig(); peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf)); peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName()); repAdmin.addPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, null); } } catch (ReplicationException ex) { throw new IOException(ex); } finally { repAdmin.close(); } }
@BeforeClass public static void setUpBeforeClass() throws Exception { conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniCluster(); admin = new ReplicationAdmin(conf1); conf2 = HBaseConfiguration.create(conf1); conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); conf2.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2182); utility2 = new HBaseTestingUtility(conf2); utility2.startMiniCluster(); ReplicationPeerConfig config = new ReplicationPeerConfig(); config.setClusterKey(utility2.getClusterKey()); admin.addPeer(peerId, config, null); HTableDescriptor table = new HTableDescriptor(tableName); HColumnDescriptor fam = new HColumnDescriptor(famName); fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); table.addFamily(fam); utility1.getHBaseAdmin().createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); utility1.waitUntilAllRegionsAssigned(tableName); }
/** * Tests that the peer configuration used by ReplicationAdmin contains all * the peer's properties. */ @Test public void testPeerConfig() throws Exception { ReplicationPeerConfig config = new ReplicationPeerConfig(); config.setClusterKey(KEY_ONE); config.getConfiguration().put("key1", "value1"); config.getConfiguration().put("key2", "value2"); admin.addPeer(ID_ONE, config, null); List<ReplicationPeer> peers = admin.listReplicationPeers(); assertEquals(1, peers.size()); ReplicationPeer peerOne = peers.get(0); assertNotNull(peerOne); assertEquals("value1", peerOne.getConfiguration().get("key1")); assertEquals("value2", peerOne.getConfiguration().get("key2")); admin.removePeer(ID_ONE); }
@VisibleForTesting List<ReplicationPeer> listReplicationPeers() { Map<String, ReplicationPeerConfig> peers = listPeerConfigs(); if (peers == null || peers.size() <= 0) { return null; } List<ReplicationPeer> listOfPeers = new ArrayList<ReplicationPeer>(peers.size()); for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) { String peerId = peerEntry.getKey(); try { Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId); Configuration peerConf = pair.getSecond(); ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(), parseTableCFsFromConfig(this.getPeerTableCFs(peerId))); listOfPeers.add(peer); } catch (ReplicationException e) { LOG.warn("Failed to get valid replication peers. " + "Error connecting to peer cluster with peerId=" + peerId + ". Error message=" + e.getMessage()); LOG.debug("Failure details to get valid replication peers.", e); continue; } } return listOfPeers; }
public void upgrade() throws Exception { try (Connection conn = ConnectionFactory.createConnection(conf)) { Admin admin = conn.getAdmin(); admin.listReplicationPeers().forEach((peerDesc) -> { String peerId = peerDesc.getPeerId(); ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig(); if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { peerConfig.setReplicateAllUserTables(false); try { admin.updateReplicationPeerConfig(peerId, peerConfig); } catch (Exception e) { LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e); } } }); } }
public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exception { Map<String, String> currentConf; StringBuilder sb = new StringBuilder(); for (ReplicationPeerDescription peer : peers) { ReplicationPeerConfig peerConfig = peer.getPeerConfig(); sb.append("Peer: " + peer.getPeerId() + "\n"); sb.append(" " + "State: " + (peer.isEnabled() ? "ENABLED" : "DISABLED") + "\n"); sb.append(" " + "Cluster Name: " + peerConfig.getClusterKey() + "\n"); sb.append(" " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n"); currentConf = peerConfig.getConfiguration(); // Only show when we have a custom configuration for the peer if (currentConf.size() > 1) { sb.append(" " + "Peer Configuration: " + currentConf + "\n"); } sb.append(" " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n"); sb.append(" " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n"); } return sb.toString(); }
@Override public void updatePeerConfig(String peerId) throws ReplicationException, IOException { Lock peerLock = peersLock.acquireLock(peerId); try { ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId); if (peer == null) { throw new ReplicationException("Peer with id=" + peerId + " is not cached."); } ReplicationPeerConfig oldConfig = peer.getPeerConfig(); ReplicationPeerConfig newConfig = replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId); // RS need to start work with the new replication config change if (!ReplicationUtils.isKeyConfigEqual(oldConfig, newConfig)) { replicationSourceManager.refreshSources(peerId); } } finally { peerLock.unlock(); } }
public void preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { checkPeerConfig(peerConfig); ReplicationPeerDescription desc = checkPeerExists(peerId); ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) { throw new DoNotRetryIOException( "Changing the cluster key on an existing peer is not allowed. Existing key '" + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" + peerConfig.getClusterKey() + "'"); } if (!isStringEquals(peerConfig.getReplicationEndpointImpl(), oldPeerConfig.getReplicationEndpointImpl())) { throw new DoNotRetryIOException("Changing the replication endpoint implementation class " + "on an existing peer is not allowed. Existing class '" + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); } }
private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { String filterCSV = peerConfig.getConfiguration() .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); if (filterCSV != null && !filterCSV.isEmpty()) { String[] filters = filterCSV.split(","); for (String filter : filters) { try { Class.forName(filter).newInstance(); } catch (Exception e) { throw new DoNotRetryIOException("Configured WALEntryFilter " + filter + " could not be created. Failing add/update " + "peer operation.", e); } } } }
/** * Add a peer and wait for it to initialize * @param peerId * @param peerConfig * @param waitForSource Whether to wait for replication source to initialize * @throws Exception */ private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig, final boolean waitForSource) throws Exception { final ReplicationPeers rp = manager.getReplicationPeers(); rp.getPeerStorage().addPeer(peerId, peerConfig, true); try { manager.addPeer(peerId); } catch (Exception e) { // ignore the failed exception, because we'll test both success & failed case. } waitPeer(peerId, manager, waitForSource); if (managerOfCluster != null) { managerOfCluster.addPeer(peerId); waitPeer(peerId, managerOfCluster, waitForSource); } }
@Test(timeout=300000) public void testReplicationPeerConfigUpdateCallback() throws Exception { String peerId = "1"; ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(utility2.getClusterKey()); rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName()); rpc.getConfiguration().put("key1", "value1"); admin1.addReplicationPeer(peerId, rpc); rpc.getConfiguration().put("key1", "value2"); admin.updatePeerConfig(peerId, rpc); if (!TestUpdatableReplicationEndpoint.hasCalledBack()) { synchronized(TestUpdatableReplicationEndpoint.class) { TestUpdatableReplicationEndpoint.class.wait(2000L); } } assertEquals(true, TestUpdatableReplicationEndpoint.hasCalledBack()); }
@BeforeClass public static void setUpBeforeClass() throws Exception { utility1 = new HBaseTestingUtility(conf1); utility1.startMiniCluster(); admin = new ReplicationAdmin(conf1); conf2 = HBaseConfiguration.create(conf1); conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); conf2.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2182); utility2 = new HBaseTestingUtility(conf2); utility2.startMiniCluster(); ReplicationPeerConfig config = new ReplicationPeerConfig(); config.setClusterKey(utility2.getClusterKey()); admin.addPeer(peerId, config, null); HTableDescriptor table = new HTableDescriptor(tableName); HColumnDescriptor fam = new HColumnDescriptor(famName); fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); table.addFamily(fam); utility1.getAdmin().createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); utility1.waitUntilAllRegionsAssigned(tableName); }
@Override public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, Map<TableName, List<String>> tableCfs) { if (tableCfs == null) { return failedFuture(new ReplicationException("tableCfs is null")); } CompletableFuture<Void> future = new CompletableFuture<Void>(); getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> { if (!completeExceptionally(future, error)) { ReplicationPeerConfig newPeerConfig = ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); updateReplicationPeerConfig(id, newPeerConfig).whenComplete((result, err) -> { if (!completeExceptionally(future, error)) { future.complete(result); } }); } }); return future; }
/** * Tests that the peer configuration used by ReplicationAdmin contains all * the peer's properties. */ @Test public void testPeerConfig() throws Exception { ReplicationPeerConfig config = new ReplicationPeerConfig(); config.setClusterKey(KEY_ONE); config.getConfiguration().put("key1", "value1"); config.getConfiguration().put("key2", "value2"); hbaseAdmin.addReplicationPeer(ID_ONE, config); List<ReplicationPeerDescription> peers = hbaseAdmin.listReplicationPeers(); assertEquals(1, peers.size()); ReplicationPeerDescription peerOne = peers.get(0); assertNotNull(peerOne); assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1")); assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2")); hbaseAdmin.removeReplicationPeer(ID_ONE); }
/** * basic checks that when we add a peer that it is enabled, and that we can disable * @throws Exception */ @Test public void testEnableDisable() throws Exception { ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); rpc1.setClusterKey(KEY_ONE); admin.addPeer(ID_ONE, rpc1, null); assertEquals(1, admin.getPeersCount()); assertTrue(admin.getPeerState(ID_ONE)); admin.disablePeer(ID_ONE); assertFalse(admin.getPeerState(ID_ONE)); try { admin.getPeerState(ID_SECOND); } catch (ReplicationPeerNotFoundException e) { // OK! } admin.removePeer(ID_ONE); }
@Test public void testSetReplicateAllUserTables() throws Exception { ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(KEY_ONE); hbaseAdmin.addReplicationPeer(ID_ONE, rpc); rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); assertTrue(rpc.replicateAllUserTables()); rpc.setReplicateAllUserTables(false); hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); assertFalse(rpc.replicateAllUserTables()); rpc.setReplicateAllUserTables(true); hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); assertTrue(rpc.replicateAllUserTables()); hbaseAdmin.removeReplicationPeer(ID_ONE); }
@BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); TEST_UTIL.startMiniCluster(); ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); conf2 = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); TEST_UTIL2 = new HBaseTestingUtility(conf2); TEST_UTIL2.startMiniCluster(); admin2 = ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get().getAdmin(); ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(TEST_UTIL2.getClusterKey()); ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join(); }
@Test public void testPeerConfig() throws Exception { ReplicationPeerConfig config = new ReplicationPeerConfig(); config.setClusterKey(KEY_ONE); config.getConfiguration().put("key1", "value1"); config.getConfiguration().put("key2", "value2"); admin.addReplicationPeer(ID_ONE, config).join(); List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get(); assertEquals(1, peers.size()); ReplicationPeerDescription peerOne = peers.get(0); assertNotNull(peerOne); assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1")); assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2")); admin.removeReplicationPeer(ID_ONE).join(); }
/** * @param bytes Content of a peer znode. * @return ClusterKey parsed from the passed bytes. * @throws DeserializationException */ public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes) throws DeserializationException { if (ProtobufUtil.isPBMagicPrefix(bytes)) { int pbLen = ProtobufUtil.lengthOfPBMagic(); ReplicationProtos.ReplicationPeer.Builder builder = ReplicationProtos.ReplicationPeer.newBuilder(); ReplicationProtos.ReplicationPeer peer; try { ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen); peer = builder.build(); } catch (IOException e) { throw new DeserializationException(e); } return convert(peer); } else { if (bytes == null || bytes.length <= 0) { throw new DeserializationException("Bytes to deserialize should not be empty."); } return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build(); } }
/** * Returns the configuration needed to talk to the remote slave cluster. * @param conf the base configuration * @param peer the description of replication peer * @return the configuration for the peer cluster, null if it was unable to get the configuration * @throws IOException when create peer cluster configuration failed */ public static Configuration getPeerClusterConfiguration(Configuration conf, ReplicationPeerDescription peer) throws IOException { ReplicationPeerConfig peerConfig = peer.getPeerConfig(); Configuration otherConf; try { otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey()); } catch (IOException e) { throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e); } if (!peerConfig.getConfiguration().isEmpty()) { CompoundConfiguration compound = new CompoundConfiguration(); compound.add(otherConf); compound.addStringMap(peerConfig.getConfiguration()); return compound; } return otherConf; }
/** * Add sources for the given peer cluster on this region server. For the newly added peer, we only * need to enqueue the latest log of each wal group and do replication * @param id the id of the peer cluster * @return the source that was created * @throws IOException */ protected ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id); ReplicationPeer peer = replicationPeers.getPeer(id); ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer); synchronized (this.walsById) { this.sources.add(src); Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>(); this.walsById.put(id, walsByGroup); // Add the latest wal to that source's queue synchronized (latestPaths) { if (this.latestPaths.size() > 0) { for (Path logPath : latestPaths) { String name = logPath.getName(); String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name); SortedSet<String> logs = new TreeSet<String>(); logs.add(name); walsByGroup.put(walPrefix, logs); try { this.replicationQueues.addLog(id, name); } catch (ReplicationException e) { String message = "Cannot add log to queue when creating a new source, queueId=" + id + ", filename=" + name; server.stop(message); throw e; } src.enqueueLog(logPath); } } } } src.startup(); return src; }
@Test public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException { // create a table with region replicas. Check whether the replication peer is created // and replication started. ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); String peerId = "region_replica_replication"; if (admin.getPeerConfig(peerId) != null) { admin.removePeer(peerId); } HTableDescriptor htd = HTU.createTableDescriptor( "testReplicationPeerIsCreated_no_region_replicas"); HTU.getHBaseAdmin().createTable(htd); ReplicationPeerConfig peerConfig = admin.getPeerConfig(peerId); assertNull(peerConfig); htd = HTU.createTableDescriptor("testReplicationPeerIsCreated"); htd.setRegionReplication(2); HTU.getHBaseAdmin().createTable(htd); // assert peer configuration is correct peerConfig = admin.getPeerConfig(peerId); assertNotNull(peerConfig); assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( HTU.getConfiguration())); assertEquals(peerConfig.getReplicationEndpointImpl(), RegionReplicaReplicationEndpoint.class.getName()); admin.close(); }
@Test (timeout=240000) public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception { // modify a table by adding region replicas. Check whether the replication peer is created // and replication started. ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); String peerId = "region_replica_replication"; if (admin.getPeerConfig(peerId) != null) { admin.removePeer(peerId); } HTableDescriptor htd = HTU.createTableDescriptor("testRegionReplicaReplicationPeerIsCreatedForModifyTable"); HTU.getHBaseAdmin().createTable(htd); // assert that replication peer is not created yet ReplicationPeerConfig peerConfig = admin.getPeerConfig(peerId); assertNull(peerConfig); HTU.getHBaseAdmin().disableTable(htd.getTableName()); htd.setRegionReplication(2); HTU.getHBaseAdmin().modifyTable(htd.getTableName(), htd); HTU.getHBaseAdmin().enableTable(htd.getTableName()); // assert peer configuration is correct peerConfig = admin.getPeerConfig(peerId); assertNotNull(peerConfig); assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( HTU.getConfiguration())); assertEquals(peerConfig.getReplicationEndpointImpl(), RegionReplicaReplicationEndpoint.class.getName()); admin.close(); }
/** * Map of this cluster's peers for display. * @return A map of peer ids to peer cluster keys * @deprecated use {@link #listPeerConfigs()} */ @Deprecated public Map<String, String> listPeers() { Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs(); Map<String, String> ret = new HashMap<String, String>(peers.size()); for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) { ret.put(entry.getKey(), entry.getValue().getClusterKey()); } return ret; }
/** * * @param configuration * @param peerName * @param tableCFs * @throws ReplicationException * @throws IOException */ protected void addPeer(final Configuration configuration,String peerName, Map<TableName, List<String>> tableCFs) throws ReplicationException, IOException { try (ReplicationAdmin replicationAdmin = new ReplicationAdmin(configuration)) { ReplicationPeerConfig peerConfig = new ReplicationPeerConfig() .setClusterKey(ZKConfig.getZooKeeperClusterKey(configuration)) .setReplicationEndpointImpl(HbaseEndpoint.class.getName()); replicationAdmin.addPeer(peerName, peerConfig, tableCFs); } }
/** * Add a new normal source to this region server * @param id the id of the peer cluster * @return the source that was created * @throws IOException */ protected ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id); ReplicationPeer peer = replicationPeers.getPeer(id); ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer); synchronized (this.walsById) { this.sources.add(src); this.walsById.put(id, new TreeSet<String>()); // Add the latest wal to that source's queue if (this.latestPath != null) { String name = this.latestPath.getName(); this.walsById.get(id).add(name); try { this.replicationQueues.addLog(src.getPeerClusterZnode(), name); } catch (ReplicationException e) { String message = "Cannot add log to queue when creating a new source, queueId=" + src.getPeerClusterZnode() + ", filename=" + name; server.stop(message); throw e; } src.enqueueLog(this.latestPath); } } src.startup(); return src; }
private static String getPeerQuorumAddress(final Configuration conf) throws IOException { ZooKeeperWatcher localZKW = null; ReplicationPeerZKImpl peer = null; try { localZKW = new ZooKeeperWatcher(conf, "VerifyReplication", new Abortable() { @Override public void abort(String why, Throwable e) {} @Override public boolean isAborted() {return false;} }); ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW); rp.init(); Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId); if (pair == null) { throw new IOException("Couldn't get peer conf!"); } Configuration peerConf = rp.getPeerConf(peerId).getSecond(); return ZKUtil.getZooKeeperClusterKey(peerConf); } catch (ReplicationException e) { throw new IOException( "An error occured while trying to connect to the remove peer cluster", e); } finally { if (peer != null) { peer.close(); } if (localZKW != null) { localZKW.close(); } } }
public void preAddPeer(String peerId, ReplicationPeerConfig peerConfig) throws DoNotRetryIOException, ReplicationException { if (peerId.contains("-")) { throw new DoNotRetryIOException("Found invalid peer name: " + peerId); } checkPeerConfig(peerConfig); if (peers.containsKey(peerId)) { throw new DoNotRetryIOException("Replication peer " + peerId + " already exists"); } // make sure that there is no queues with the same peer id. This may happen when we create a // peer with the same id with a old deleted peer. If the replication queues for the old peer // have not been cleaned up yet then we should not create the new peer, otherwise the old wal // file may also be replicated. checkQueuesDeleted(peerId); }
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException { if (peers.containsKey(peerId)) { // this should be a retry, just return return; } ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build(); peerStorage.addPeer(peerId, copiedPeerConfig, enabled); peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig)); }
@Override public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig) { return this .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall( RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig), (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG")); }
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { checkClusterKey(peerConfig.getClusterKey()); if (peerConfig.replicateAllUserTables()) { // If replicate_all flag is true, it means all user tables will be replicated to peer cluster. // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer // cluster. if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " + "when you want replicate all cluster"); } checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), peerConfig.getExcludeTableCFsMap()); } else { // If replicate_all flag is false, it means all user tables can't be replicated to peer // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer // cluster. if ((peerConfig.getExcludeNamespaces() != null && !peerConfig.getExcludeNamespaces().isEmpty()) || (peerConfig.getExcludeTableCFsMap() != null && !peerConfig.getExcludeTableCFsMap().isEmpty())) { throw new DoNotRetryIOException( "Need clean exclude-namespaces or exclude-table-cfs config firstly" + " when replicate_all flag is false"); } checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), peerConfig.getTableCFsMap()); } checkConfiguredWALEntryFilters(peerConfig); }
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf) throws ReplicationException { ReplicationPeerStorage peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zk, conf); ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>(); for (String peerId : peerStorage.listPeerIds()) { ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); boolean enabled = peerStorage.isPeerEnabled(peerId); peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig)); } return new ReplicationPeerManager(peerStorage, ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers); }
public void preAddReplicationPeer(final String peerId, final ReplicationPeerConfig peerConfig) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { @Override public void call(MasterObserver observer) throws IOException { observer.preAddReplicationPeer(this, peerId, peerConfig); } }); }
public void postAddReplicationPeer(final String peerId, final ReplicationPeerConfig peerConfig) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { @Override public void call(MasterObserver observer) throws IOException { observer.postAddReplicationPeer(this, peerId, peerConfig); } }); }
public void preUpdateReplicationPeerConfig(final String peerId, final ReplicationPeerConfig peerConfig) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { @Override public void call(MasterObserver observer) throws IOException { observer.preUpdateReplicationPeerConfig(this, peerId, peerConfig); } }); }
public void postUpdateReplicationPeerConfig(final String peerId, final ReplicationPeerConfig peerConfig) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { @Override public void call(MasterObserver observer) throws IOException { observer.postUpdateReplicationPeerConfig(this, peerId, peerConfig); } }); }
@Override public CompletableFuture<Void> removeReplicationPeerTableCFs(String id, Map<TableName, List<String>> tableCfs) { if (tableCfs == null) { return failedFuture(new ReplicationException("tableCfs is null")); } CompletableFuture<Void> future = new CompletableFuture<Void>(); getReplicationPeerConfig(id).whenComplete( (peerConfig, error) -> { if (!completeExceptionally(future, error)) { ReplicationPeerConfig newPeerConfig = null; try { newPeerConfig = ReplicationPeerConfigUtil .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); } catch (ReplicationException e) { future.completeExceptionally(e); return; } updateReplicationPeerConfig(id, newPeerConfig).whenComplete((result, err) -> { if (!completeExceptionally(future, error)) { future.complete(result); } }); } }); return future; }
@Override public ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException, IOException { if (cpHost != null) { cpHost.preGetReplicationPeerConfig(peerId); } LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId); ReplicationPeerConfig peerConfig = this.replicationPeerManager.getPeerConfig(peerId) .orElseThrow(() -> new ReplicationPeerNotFoundException(peerId)); if (cpHost != null) { cpHost.postGetReplicationPeerConfig(peerId); } return peerConfig; }
@Override public long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig) throws ReplicationException, IOException { LOG.info(getClientIdAuditPrefix() + " update replication peer config, id=" + peerId + ", config=" + peerConfig); return executePeerProcedure(new UpdatePeerConfigProcedure(peerId, peerConfig)); }