Java 类org.apache.hadoop.hbase.replication.ReplicationPeerConfig 实例源码

项目:ditb    文件:ServerRegionReplicaUtil.java   
/**
 * Create replication peer for replicating to region replicas if needed.
 * @param conf configuration to use
 * @throws IOException
 */
public static void setupRegionReplicaReplication(Configuration conf) throws IOException {
  if (!isRegionReplicaReplicationEnabled(conf)) {
    return;
  }
  ReplicationAdmin repAdmin = new ReplicationAdmin(conf);
  try {
    if (repAdmin.getPeerConfig(REGION_REPLICA_REPLICATION_PEER) == null) {
      ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
      peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf));
      peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName());
      repAdmin.addPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, null);
    }
  } catch (ReplicationException ex) {
    throw new IOException(ex);
  } finally {
    repAdmin.close();
  }
}
项目:ditb    文件:TestReplicationAdminWithTwoDifferentZKClusters.java   
@BeforeClass
public static void setUpBeforeClass() throws Exception {
  conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
  utility1 = new HBaseTestingUtility(conf1);
  utility1.startMiniCluster();
  admin = new ReplicationAdmin(conf1);

  conf2 = HBaseConfiguration.create(conf1);
  conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
  conf2.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2182);

  utility2 = new HBaseTestingUtility(conf2);
  utility2.startMiniCluster();

  ReplicationPeerConfig config = new ReplicationPeerConfig();
  config.setClusterKey(utility2.getClusterKey());
  admin.addPeer(peerId, config, null);

  HTableDescriptor table = new HTableDescriptor(tableName);
  HColumnDescriptor fam = new HColumnDescriptor(famName);
  fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
  table.addFamily(fam);

  utility1.getHBaseAdmin().createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
  utility1.waitUntilAllRegionsAssigned(tableName);
}
项目:ditb    文件:TestReplicationAdmin.java   
/**
 * Tests that the peer configuration used by ReplicationAdmin contains all
 * the peer's properties.
 */
@Test
public void testPeerConfig() throws Exception {
  ReplicationPeerConfig config = new ReplicationPeerConfig();
  config.setClusterKey(KEY_ONE);
  config.getConfiguration().put("key1", "value1");
  config.getConfiguration().put("key2", "value2");
  admin.addPeer(ID_ONE, config, null);

  List<ReplicationPeer> peers = admin.listReplicationPeers();
  assertEquals(1, peers.size());
  ReplicationPeer peerOne = peers.get(0);
  assertNotNull(peerOne);
  assertEquals("value1", peerOne.getConfiguration().get("key1"));
  assertEquals("value2", peerOne.getConfiguration().get("key2"));

  admin.removePeer(ID_ONE);
}
项目:ditb    文件:ReplicationAdmin.java   
@VisibleForTesting
List<ReplicationPeer> listReplicationPeers() {
  Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
  if (peers == null || peers.size() <= 0) {
    return null;
  }
  List<ReplicationPeer> listOfPeers = new ArrayList<ReplicationPeer>(peers.size());
  for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
    String peerId = peerEntry.getKey();
    try {
      Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
      Configuration peerConf = pair.getSecond();
      ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(),
          parseTableCFsFromConfig(this.getPeerTableCFs(peerId)));
      listOfPeers.add(peer);
    } catch (ReplicationException e) {
      LOG.warn("Failed to get valid replication peers. "
          + "Error connecting to peer cluster with peerId=" + peerId + ". Error message="
          + e.getMessage());
      LOG.debug("Failure details to get valid replication peers.", e);
      continue;
    }
  }
  return listOfPeers;
}
项目:hbase    文件:ReplicationPeerConfigUpgrader.java   
public void upgrade() throws Exception {
  try (Connection conn = ConnectionFactory.createConnection(conf)) {
    Admin admin = conn.getAdmin();
    admin.listReplicationPeers().forEach((peerDesc) -> {
      String peerId = peerDesc.getPeerId();
      ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig();
      if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
          || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
        peerConfig.setReplicateAllUserTables(false);
        try {
          admin.updateReplicationPeerConfig(peerId, peerConfig);
        } catch (Exception e) {
          LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e);
        }
      }
    });
  }
}
项目:hbase    文件:DumpReplicationQueues.java   
public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exception {
  Map<String, String> currentConf;
  StringBuilder sb = new StringBuilder();
  for (ReplicationPeerDescription peer : peers) {
    ReplicationPeerConfig peerConfig = peer.getPeerConfig();
    sb.append("Peer: " + peer.getPeerId() + "\n");
    sb.append("    " + "State: " + (peer.isEnabled() ? "ENABLED" : "DISABLED") + "\n");
    sb.append("    " + "Cluster Name: " + peerConfig.getClusterKey() + "\n");
    sb.append("    " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n");
    currentConf = peerConfig.getConfiguration();
    // Only show when we have a custom configuration for the peer
    if (currentConf.size() > 1) {
      sb.append("    " + "Peer Configuration: " + currentConf + "\n");
    }
    sb.append("    " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n");
    sb.append("    " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n");
  }
  return sb.toString();
}
项目:hbase    文件:PeerProcedureHandlerImpl.java   
@Override
public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
  Lock peerLock = peersLock.acquireLock(peerId);
  try {
    ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
    if (peer == null) {
      throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
    }
    ReplicationPeerConfig oldConfig = peer.getPeerConfig();
    ReplicationPeerConfig newConfig =
        replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
    // RS need to start work with the new replication config change
    if (!ReplicationUtils.isKeyConfigEqual(oldConfig, newConfig)) {
      replicationSourceManager.refreshSources(peerId);
    }
  } finally {
    peerLock.unlock();
  }
}
项目:hbase    文件:ReplicationPeerManager.java   
public void preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
    throws DoNotRetryIOException {
  checkPeerConfig(peerConfig);
  ReplicationPeerDescription desc = checkPeerExists(peerId);
  ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
  if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) {
    throw new DoNotRetryIOException(
        "Changing the cluster key on an existing peer is not allowed. Existing key '" +
            oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" +
            peerConfig.getClusterKey() + "'");
  }

  if (!isStringEquals(peerConfig.getReplicationEndpointImpl(),
    oldPeerConfig.getReplicationEndpointImpl())) {
    throw new DoNotRetryIOException("Changing the replication endpoint implementation class " +
        "on an existing peer is not allowed. Existing class '" +
        oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId +
        " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'");
  }
}
项目:hbase    文件:ReplicationPeerManager.java   
private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
    throws DoNotRetryIOException {
  String filterCSV = peerConfig.getConfiguration()
      .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
  if (filterCSV != null && !filterCSV.isEmpty()) {
    String[] filters = filterCSV.split(",");
    for (String filter : filters) {
      try {
        Class.forName(filter).newInstance();
      } catch (Exception e) {
        throw new DoNotRetryIOException("Configured WALEntryFilter " + filter +
          " could not be created. Failing add/update " + "peer operation.", e);
      }
    }
  }
}
项目:hbase    文件:TestReplicationSourceManager.java   
/**
 * Add a peer and wait for it to initialize
 * @param peerId
 * @param peerConfig
 * @param waitForSource Whether to wait for replication source to initialize
 * @throws Exception
 */
private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
    final boolean waitForSource) throws Exception {
  final ReplicationPeers rp = manager.getReplicationPeers();
  rp.getPeerStorage().addPeer(peerId, peerConfig, true);
  try {
    manager.addPeer(peerId);
  } catch (Exception e) {
    // ignore the failed exception, because we'll test both success & failed case.
  }
  waitPeer(peerId, manager, waitForSource);
  if (managerOfCluster != null) {
    managerOfCluster.addPeer(peerId);
    waitPeer(peerId, managerOfCluster, waitForSource);
  }
}
项目:hbase    文件:TestReplicationAdminWithClusters.java   
@Test(timeout=300000)
public void testReplicationPeerConfigUpdateCallback() throws Exception {
  String peerId = "1";
  ReplicationPeerConfig rpc = new ReplicationPeerConfig();
  rpc.setClusterKey(utility2.getClusterKey());
  rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName());
  rpc.getConfiguration().put("key1", "value1");

  admin1.addReplicationPeer(peerId, rpc);

  rpc.getConfiguration().put("key1", "value2");
  admin.updatePeerConfig(peerId, rpc);
  if (!TestUpdatableReplicationEndpoint.hasCalledBack()) {
    synchronized(TestUpdatableReplicationEndpoint.class) {
      TestUpdatableReplicationEndpoint.class.wait(2000L);
    }
  }

  assertEquals(true, TestUpdatableReplicationEndpoint.hasCalledBack());
}
项目:hbase    文件:TestReplicationAdminWithTwoDifferentZKClusters.java   
@BeforeClass
public static void setUpBeforeClass() throws Exception {
  utility1 = new HBaseTestingUtility(conf1);
  utility1.startMiniCluster();
  admin = new ReplicationAdmin(conf1);

  conf2 = HBaseConfiguration.create(conf1);
  conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
  conf2.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2182);

  utility2 = new HBaseTestingUtility(conf2);
  utility2.startMiniCluster();

  ReplicationPeerConfig config = new ReplicationPeerConfig();
  config.setClusterKey(utility2.getClusterKey());
  admin.addPeer(peerId, config, null);

  HTableDescriptor table = new HTableDescriptor(tableName);
  HColumnDescriptor fam = new HColumnDescriptor(famName);
  fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
  table.addFamily(fam);

  utility1.getAdmin().createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
  utility1.waitUntilAllRegionsAssigned(tableName);
}
项目:hbase    文件:RawAsyncHBaseAdmin.java   
@Override
public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
    Map<TableName, List<String>> tableCfs) {
  if (tableCfs == null) {
    return failedFuture(new ReplicationException("tableCfs is null"));
  }

  CompletableFuture<Void> future = new CompletableFuture<Void>();
  getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
    if (!completeExceptionally(future, error)) {
      ReplicationPeerConfig newPeerConfig =
          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
      updateReplicationPeerConfig(id, newPeerConfig).whenComplete((result, err) -> {
        if (!completeExceptionally(future, error)) {
          future.complete(result);
        }
      });
    }
  });
  return future;
}
项目:hbase    文件:TestReplicationAdmin.java   
/**
 * Tests that the peer configuration used by ReplicationAdmin contains all
 * the peer's properties.
 */
@Test
public void testPeerConfig() throws Exception {
  ReplicationPeerConfig config = new ReplicationPeerConfig();
  config.setClusterKey(KEY_ONE);
  config.getConfiguration().put("key1", "value1");
  config.getConfiguration().put("key2", "value2");
  hbaseAdmin.addReplicationPeer(ID_ONE, config);

  List<ReplicationPeerDescription> peers = hbaseAdmin.listReplicationPeers();
  assertEquals(1, peers.size());
  ReplicationPeerDescription peerOne = peers.get(0);
  assertNotNull(peerOne);
  assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
  assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));

  hbaseAdmin.removeReplicationPeer(ID_ONE);
}
项目:hbase    文件:TestReplicationAdmin.java   
/**
 * basic checks that when we add a peer that it is enabled, and that we can disable
 * @throws Exception
 */
@Test
public void testEnableDisable() throws Exception {
  ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
  rpc1.setClusterKey(KEY_ONE);
  admin.addPeer(ID_ONE, rpc1, null);
  assertEquals(1, admin.getPeersCount());
  assertTrue(admin.getPeerState(ID_ONE));
  admin.disablePeer(ID_ONE);

  assertFalse(admin.getPeerState(ID_ONE));
  try {
    admin.getPeerState(ID_SECOND);
  } catch (ReplicationPeerNotFoundException e) {
    // OK!
  }
  admin.removePeer(ID_ONE);
}
项目:hbase    文件:TestReplicationAdmin.java   
@Test
public void testSetReplicateAllUserTables() throws Exception {
  ReplicationPeerConfig rpc = new ReplicationPeerConfig();
  rpc.setClusterKey(KEY_ONE);
  hbaseAdmin.addReplicationPeer(ID_ONE, rpc);

  rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
  assertTrue(rpc.replicateAllUserTables());

  rpc.setReplicateAllUserTables(false);
  hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
  rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
  assertFalse(rpc.replicateAllUserTables());

  rpc.setReplicateAllUserTables(true);
  hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
  rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
  assertTrue(rpc.replicateAllUserTables());

  hbaseAdmin.removeReplicationPeer(ID_ONE);
}
项目:hbase    文件:TestAsyncReplicationAdminApiWithClusters.java   
@BeforeClass
public static void setUpBeforeClass() throws Exception {
  TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
  TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
  TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
  TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
  TEST_UTIL.startMiniCluster();
  ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();

  conf2 = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
  conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
  TEST_UTIL2 = new HBaseTestingUtility(conf2);
  TEST_UTIL2.startMiniCluster();
  admin2 =
      ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get().getAdmin();

  ReplicationPeerConfig rpc = new ReplicationPeerConfig();
  rpc.setClusterKey(TEST_UTIL2.getClusterKey());
  ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join();
}
项目:hbase    文件:TestAsyncReplicationAdminApi.java   
@Test
public void testPeerConfig() throws Exception {
  ReplicationPeerConfig config = new ReplicationPeerConfig();
  config.setClusterKey(KEY_ONE);
  config.getConfiguration().put("key1", "value1");
  config.getConfiguration().put("key2", "value2");
  admin.addReplicationPeer(ID_ONE, config).join();

  List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
  assertEquals(1, peers.size());
  ReplicationPeerDescription peerOne = peers.get(0);
  assertNotNull(peerOne);
  assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
  assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));

  admin.removeReplicationPeer(ID_ONE).join();
}
项目:hbase    文件:ReplicationPeerConfigUtil.java   
/**
 * @param bytes Content of a peer znode.
 * @return ClusterKey parsed from the passed bytes.
 * @throws DeserializationException
 */
public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
    throws DeserializationException {
  if (ProtobufUtil.isPBMagicPrefix(bytes)) {
    int pbLen = ProtobufUtil.lengthOfPBMagic();
    ReplicationProtos.ReplicationPeer.Builder builder =
        ReplicationProtos.ReplicationPeer.newBuilder();
    ReplicationProtos.ReplicationPeer peer;
    try {
      ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
      peer = builder.build();
    } catch (IOException e) {
      throw new DeserializationException(e);
    }
    return convert(peer);
  } else {
    if (bytes == null || bytes.length <= 0) {
      throw new DeserializationException("Bytes to deserialize should not be empty.");
    }
    return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
  }
}
项目:hbase    文件:ReplicationPeerConfigUtil.java   
/**
 * Returns the configuration needed to talk to the remote slave cluster.
 * @param conf the base configuration
 * @param peer the description of replication peer
 * @return the configuration for the peer cluster, null if it was unable to get the configuration
 * @throws IOException when create peer cluster configuration failed
 */
public static Configuration getPeerClusterConfiguration(Configuration conf,
    ReplicationPeerDescription peer) throws IOException {
  ReplicationPeerConfig peerConfig = peer.getPeerConfig();
  Configuration otherConf;
  try {
    otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey());
  } catch (IOException e) {
    throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e);
  }

  if (!peerConfig.getConfiguration().isEmpty()) {
    CompoundConfiguration compound = new CompoundConfiguration();
    compound.add(otherConf);
    compound.addStringMap(peerConfig.getConfiguration());
    return compound;
  }

  return otherConf;
}
项目:ditb    文件:ReplicationSourceManager.java   
/**
 * Add sources for the given peer cluster on this region server. For the newly added peer, we only
 * need to enqueue the latest log of each wal group and do replication
 * @param id the id of the peer cluster
 * @return the source that was created
 * @throws IOException
 */
protected ReplicationSourceInterface addSource(String id) throws IOException,
    ReplicationException {
  ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
  ReplicationPeer peer = replicationPeers.getPeer(id);
  ReplicationSourceInterface src =
      getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
        this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
  synchronized (this.walsById) {
    this.sources.add(src);
    Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
    this.walsById.put(id, walsByGroup);
    // Add the latest wal to that source's queue
    synchronized (latestPaths) {
      if (this.latestPaths.size() > 0) {
        for (Path logPath : latestPaths) {
          String name = logPath.getName();
          String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name);
          SortedSet<String> logs = new TreeSet<String>();
          logs.add(name);
          walsByGroup.put(walPrefix, logs);
          try {
            this.replicationQueues.addLog(id, name);
          } catch (ReplicationException e) {
            String message =
                "Cannot add log to queue when creating a new source, queueId=" + id
                    + ", filename=" + name;
            server.stop(message);
            throw e;
          }
          src.enqueueLog(logPath);
        }
      }
    }
  }
  src.startup();
  return src;
}
项目:ditb    文件:TestRegionReplicaReplicationEndpoint.java   
@Test
public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException {
  // create a table with region replicas. Check whether the replication peer is created
  // and replication started.
  ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
  String peerId = "region_replica_replication";

  if (admin.getPeerConfig(peerId) != null) {
    admin.removePeer(peerId);
  }

  HTableDescriptor htd = HTU.createTableDescriptor(
    "testReplicationPeerIsCreated_no_region_replicas");
  HTU.getHBaseAdmin().createTable(htd);
  ReplicationPeerConfig peerConfig = admin.getPeerConfig(peerId);
  assertNull(peerConfig);

  htd = HTU.createTableDescriptor("testReplicationPeerIsCreated");
  htd.setRegionReplication(2);
  HTU.getHBaseAdmin().createTable(htd);

  // assert peer configuration is correct
  peerConfig = admin.getPeerConfig(peerId);
  assertNotNull(peerConfig);
  assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
      HTU.getConfiguration()));
  assertEquals(peerConfig.getReplicationEndpointImpl(),
    RegionReplicaReplicationEndpoint.class.getName());
  admin.close();
}
项目:ditb    文件:TestRegionReplicaReplicationEndpoint.java   
@Test (timeout=240000)
public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception {
  // modify a table by adding region replicas. Check whether the replication peer is created
  // and replication started.
  ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
  String peerId = "region_replica_replication";

  if (admin.getPeerConfig(peerId) != null) {
    admin.removePeer(peerId);
  }

  HTableDescriptor htd
    = HTU.createTableDescriptor("testRegionReplicaReplicationPeerIsCreatedForModifyTable");
  HTU.getHBaseAdmin().createTable(htd);

  // assert that replication peer is not created yet
  ReplicationPeerConfig peerConfig = admin.getPeerConfig(peerId);
  assertNull(peerConfig);

  HTU.getHBaseAdmin().disableTable(htd.getTableName());
  htd.setRegionReplication(2);
  HTU.getHBaseAdmin().modifyTable(htd.getTableName(), htd);
  HTU.getHBaseAdmin().enableTable(htd.getTableName());

  // assert peer configuration is correct
  peerConfig = admin.getPeerConfig(peerId);
  assertNotNull(peerConfig);
  assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
      HTU.getConfiguration()));
  assertEquals(peerConfig.getReplicationEndpointImpl(),
    RegionReplicaReplicationEndpoint.class.getName());
  admin.close();
}
项目:ditb    文件:ReplicationAdmin.java   
/**
 * Map of this cluster's peers for display.
 * @return A map of peer ids to peer cluster keys
 * @deprecated use {@link #listPeerConfigs()}
 */
@Deprecated
public Map<String, String> listPeers() {
  Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
  Map<String, String> ret = new HashMap<String, String>(peers.size());

  for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
    ret.put(entry.getKey(), entry.getValue().getClusterKey());
  }
  return ret;
}
项目:hbase-connect-kafka    文件:BaseTest.java   
/**
*
* @param configuration
* @param peerName
* @param tableCFs
* @throws ReplicationException
* @throws IOException
*/
  protected void addPeer(final Configuration configuration,String peerName, Map<TableName, List<String>> tableCFs)
     throws ReplicationException, IOException {
      try (ReplicationAdmin replicationAdmin = new ReplicationAdmin(configuration)) {
          ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
              .setClusterKey(ZKConfig.getZooKeeperClusterKey(configuration))
              .setReplicationEndpointImpl(HbaseEndpoint.class.getName());

          replicationAdmin.addPeer(peerName, peerConfig, tableCFs);
      }
  }
项目:pbase    文件:ReplicationSourceManager.java   
/**
 * Add a new normal source to this region server
 * @param id the id of the peer cluster
 * @return the source that was created
 * @throws IOException
 */
protected ReplicationSourceInterface addSource(String id) throws IOException,
    ReplicationException {
  ReplicationPeerConfig peerConfig
    = replicationPeers.getReplicationPeerConfig(id);
  ReplicationPeer peer = replicationPeers.getPeer(id);
  ReplicationSourceInterface src =
      getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
        this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
  synchronized (this.walsById) {
    this.sources.add(src);
    this.walsById.put(id, new TreeSet<String>());
    // Add the latest wal to that source's queue
    if (this.latestPath != null) {
      String name = this.latestPath.getName();
      this.walsById.get(id).add(name);
      try {
        this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
      } catch (ReplicationException e) {
        String message =
            "Cannot add log to queue when creating a new source, queueId="
                + src.getPeerClusterZnode() + ", filename=" + name;
        server.stop(message);
        throw e;
      }
      src.enqueueLog(this.latestPath);
    }
  }
  src.startup();
  return src;
}
项目:pbase    文件:VerifyReplication.java   
private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
  ZooKeeperWatcher localZKW = null;
  ReplicationPeerZKImpl peer = null;
  try {
    localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
        new Abortable() {
          @Override public void abort(String why, Throwable e) {}
          @Override public boolean isAborted() {return false;}
        });

    ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
    rp.init();

    Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
    if (pair == null) {
      throw new IOException("Couldn't get peer conf!");
    }
    Configuration peerConf = rp.getPeerConf(peerId).getSecond();
    return ZKUtil.getZooKeeperClusterKey(peerConf);
  } catch (ReplicationException e) {
    throw new IOException(
        "An error occured while trying to connect to the remove peer cluster", e);
  } finally {
    if (peer != null) {
      peer.close();
    }
    if (localZKW != null) {
      localZKW.close();
    }
  }
}
项目:pbase    文件:ReplicationAdmin.java   
/**
 * Map of this cluster's peers for display.
 * @return A map of peer ids to peer cluster keys
 * @deprecated use {@link #listPeerConfigs()}
 */
@Deprecated
public Map<String, String> listPeers() {
  Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
  Map<String, String> ret = new HashMap<String, String>(peers.size());

  for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
    ret.put(entry.getKey(), entry.getValue().getClusterKey());
  }
  return ret;
}
项目:hbase    文件:ReplicationPeerManager.java   
public void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
    throws DoNotRetryIOException, ReplicationException {
  if (peerId.contains("-")) {
    throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
  }
  checkPeerConfig(peerConfig);
  if (peers.containsKey(peerId)) {
    throw new DoNotRetryIOException("Replication peer " + peerId + " already exists");
  }
  // make sure that there is no queues with the same peer id. This may happen when we create a
  // peer with the same id with a old deleted peer. If the replication queues for the old peer
  // have not been cleaned up yet then we should not create the new peer, otherwise the old wal
  // file may also be replicated.
  checkQueuesDeleted(peerId);
}
项目:hbase    文件:ReplicationPeerManager.java   
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
    throws ReplicationException {
  if (peers.containsKey(peerId)) {
    // this should be a retry, just return
    return;
  }
  ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
  peerStorage.addPeer(peerId, copiedPeerConfig, enabled);
  peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig));
}
项目:hbase    文件:RawAsyncHBaseAdmin.java   
@Override
public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
    ReplicationPeerConfig peerConfig) {
  return this
      .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall(
        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
        (resp) -> resp.getProcId(),
        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
}
项目:hbase    文件:ReplicationPeerManager.java   
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
  checkClusterKey(peerConfig.getClusterKey());

  if (peerConfig.replicateAllUserTables()) {
    // If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
    // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
    // cluster.
    if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
        || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
      throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly "
          + "when you want replicate all cluster");
    }
    checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
      peerConfig.getExcludeTableCFsMap());
  } else {
    // If replicate_all flag is false, it means all user tables can't be replicated to peer
    // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer
    // cluster.
    if ((peerConfig.getExcludeNamespaces() != null
        && !peerConfig.getExcludeNamespaces().isEmpty())
        || (peerConfig.getExcludeTableCFsMap() != null
            && !peerConfig.getExcludeTableCFsMap().isEmpty())) {
      throw new DoNotRetryIOException(
          "Need clean exclude-namespaces or exclude-table-cfs config firstly"
              + " when replicate_all flag is false");
    }
    checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
      peerConfig.getTableCFsMap());
  }

  checkConfiguredWALEntryFilters(peerConfig);
}
项目:hbase    文件:ReplicationPeerManager.java   
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
    throws ReplicationException {
  ReplicationPeerStorage peerStorage =
      ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
  ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
  for (String peerId : peerStorage.listPeerIds()) {
    ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
    boolean enabled = peerStorage.isPeerEnabled(peerId);
    peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
  }
  return new ReplicationPeerManager(peerStorage,
      ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
}
项目:hbase    文件:MasterCoprocessorHost.java   
public void preAddReplicationPeer(final String peerId, final ReplicationPeerConfig peerConfig)
    throws IOException {
  execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
    @Override
    public void call(MasterObserver observer) throws IOException {
      observer.preAddReplicationPeer(this, peerId, peerConfig);
    }
  });
}
项目:hbase    文件:MasterCoprocessorHost.java   
public void postAddReplicationPeer(final String peerId, final ReplicationPeerConfig peerConfig)
    throws IOException {
  execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
    @Override
    public void call(MasterObserver observer) throws IOException {
      observer.postAddReplicationPeer(this, peerId, peerConfig);
    }
  });
}
项目:hbase    文件:MasterCoprocessorHost.java   
public void preUpdateReplicationPeerConfig(final String peerId,
    final ReplicationPeerConfig peerConfig) throws IOException {
  execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
    @Override
    public void call(MasterObserver observer) throws IOException {
      observer.preUpdateReplicationPeerConfig(this, peerId, peerConfig);
    }
  });
}
项目:hbase    文件:MasterCoprocessorHost.java   
public void postUpdateReplicationPeerConfig(final String peerId,
    final ReplicationPeerConfig peerConfig) throws IOException {
  execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
    @Override
    public void call(MasterObserver observer) throws IOException {
      observer.postUpdateReplicationPeerConfig(this, peerId, peerConfig);
    }
  });
}
项目:hbase    文件:RawAsyncHBaseAdmin.java   
@Override
public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
    Map<TableName, List<String>> tableCfs) {
  if (tableCfs == null) {
    return failedFuture(new ReplicationException("tableCfs is null"));
  }

  CompletableFuture<Void> future = new CompletableFuture<Void>();
  getReplicationPeerConfig(id).whenComplete(
    (peerConfig, error) -> {
      if (!completeExceptionally(future, error)) {
        ReplicationPeerConfig newPeerConfig = null;
        try {
          newPeerConfig = ReplicationPeerConfigUtil
              .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
        } catch (ReplicationException e) {
          future.completeExceptionally(e);
          return;
        }
        updateReplicationPeerConfig(id, newPeerConfig).whenComplete((result, err) -> {
          if (!completeExceptionally(future, error)) {
            future.complete(result);
          }
        });
      }
    });
  return future;
}
项目:hbase    文件:HMaster.java   
@Override
public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
    throws ReplicationException, IOException {
  if (cpHost != null) {
    cpHost.preGetReplicationPeerConfig(peerId);
  }
  LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId);
  ReplicationPeerConfig peerConfig = this.replicationPeerManager.getPeerConfig(peerId)
      .orElseThrow(() -> new ReplicationPeerNotFoundException(peerId));
  if (cpHost != null) {
    cpHost.postGetReplicationPeerConfig(peerId);
  }
  return peerConfig;
}
项目:hbase    文件:HMaster.java   
@Override
public long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
    throws ReplicationException, IOException {
  LOG.info(getClientIdAuditPrefix() + " update replication peer config, id=" + peerId +
    ", config=" + peerConfig);
  return executePeerProcedure(new UpdatePeerConfigProcedure(peerId, peerConfig));
}