Java 类org.apache.hadoop.hbase.replication.ReplicationPeers 实例源码

项目:HIndex    文件:ReplicationSourceManager.java   
/**
 * Factory method to create a replication source
 * @param conf the configuration to use
 * @param fs the file system to use
 * @param manager the manager to use
 * @param stopper the stopper object for this region server
 * @param peerId the id of the peer cluster
 * @return the created source
 * @throws IOException
 */
protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
    final FileSystem fs, final ReplicationSourceManager manager,
    final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
    final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException {
  ReplicationSourceInterface src;
  try {
    @SuppressWarnings("rawtypes")
    Class c = Class.forName(conf.get("replication.replicationsource.implementation",
        ReplicationSource.class.getCanonicalName()));
    src = (ReplicationSourceInterface) c.newInstance();
  } catch (Exception e) {
    LOG.warn("Passed replication source implementation throws errors, " +
        "defaulting to ReplicationSource", e);
    src = new ReplicationSource();

  }
  src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId);
  return src;
}
项目:hbase    文件:TestReplicationSourceManager.java   
/**
 * Add a peer and wait for it to initialize
 * @param peerId
 * @param peerConfig
 * @param waitForSource Whether to wait for replication source to initialize
 * @throws Exception
 */
private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
    final boolean waitForSource) throws Exception {
  final ReplicationPeers rp = manager.getReplicationPeers();
  rp.getPeerStorage().addPeer(peerId, peerConfig, true);
  try {
    manager.addPeer(peerId);
  } catch (Exception e) {
    // ignore the failed exception, because we'll test both success & failed case.
  }
  waitPeer(peerId, manager, waitForSource);
  if (managerOfCluster != null) {
    managerOfCluster.addPeer(peerId);
    waitPeer(peerId, managerOfCluster, waitForSource);
  }
}
项目:hbase    文件:TestReplicationSourceManager.java   
private static void waitPeer(final String peerId,
    ReplicationSourceManager manager, final boolean waitForSource) {
  ReplicationPeers rp = manager.getReplicationPeers();
  Waiter.waitFor(conf, 20000, () -> {
    if (waitForSource) {
      ReplicationSourceInterface rs = manager.getSource(peerId);
      if (rs == null) {
        return false;
      }
      if (rs instanceof ReplicationSourceDummy) {
        return ((ReplicationSourceDummy)rs).isStartup();
      }
      return true;
    } else {
      return (rp.getPeer(peerId) != null);
    }
  });
}
项目:hbase    文件:TestReplicationSourceManager.java   
/**
 * Remove a peer and wait for it to get cleaned up
 * @param peerId
 * @throws Exception
 */
private void removePeerAndWait(final String peerId) throws Exception {
  final ReplicationPeers rp = manager.getReplicationPeers();
  if (rp.getPeerStorage().listPeerIds().contains(peerId)) {
    rp.getPeerStorage().removePeer(peerId);
    try {
      manager.removePeer(peerId);
    } catch (Exception e) {
      // ignore the failed exception and continue.
    }
  }
  Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
    @Override
    public boolean evaluate() throws Exception {
      Collection<String> peers = rp.getPeerStorage().listPeerIds();
      return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null)
          && (!peers.contains(peerId)) && manager.getSource(peerId) == null;
    }
  });
}
项目:PyroDB    文件:ReplicationSourceManager.java   
/**
 * Factory method to create a replication source
 * @param conf the configuration to use
 * @param fs the file system to use
 * @param manager the manager to use
 * @param stopper the stopper object for this region server
 * @param peerId the id of the peer cluster
 * @return the created source
 * @throws IOException
 */
protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
    final FileSystem fs, final ReplicationSourceManager manager,
    final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
    final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException {
  ReplicationSourceInterface src;
  try {
    @SuppressWarnings("rawtypes")
    Class c = Class.forName(conf.get("replication.replicationsource.implementation",
        ReplicationSource.class.getCanonicalName()));
    src = (ReplicationSourceInterface) c.newInstance();
  } catch (Exception e) {
    LOG.warn("Passed replication source implementation throws errors, " +
        "defaulting to ReplicationSource", e);
    src = new ReplicationSource();

  }
  src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId);
  return src;
}
项目:c5    文件:ReplicationSourceManager.java   
/**
 * Factory method to create a replication source
 * @param conf the configuration to use
 * @param fs the file system to use
 * @param manager the manager to use
 * @param stopper the stopper object for this region server
 * @param peerId the id of the peer cluster
 * @return the created source
 * @throws IOException
 */
protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
    final FileSystem fs, final ReplicationSourceManager manager,
    final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
    final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException {
  ReplicationSourceInterface src;
  try {
    @SuppressWarnings("rawtypes")
    Class c = Class.forName(conf.get("replication.replicationsource.implementation",
        ReplicationSource.class.getCanonicalName()));
    src = (ReplicationSourceInterface) c.newInstance();
  } catch (Exception e) {
    LOG.warn("Passed replication source implementation throws errors, " +
        "defaulting to ReplicationSource", e);
    src = new ReplicationSource();

  }
  src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId);
  return src;
}
项目:ditb    文件:ReplicationSourceManager.java   
/**
 * Creates a replication manager and sets the watch on all the other registered region servers
 * @param replicationQueues the interface for manipulating replication queues
 * @param replicationPeers
 * @param replicationTracker
 * @param conf the configuration to use
 * @param server the server for this region server
 * @param fs the file system to use
 * @param logDir the directory that contains all wal directories of live RSs
 * @param oldLogDir the directory where old logs are archived
 * @param clusterId
 */
public ReplicationSourceManager(final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
    final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
    final Path oldLogDir, final UUID clusterId) {
  //CopyOnWriteArrayList is thread-safe.
  //Generally, reading is more than modifying.
  this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
  this.replicationQueues = replicationQueues;
  this.replicationPeers = replicationPeers;
  this.replicationTracker = replicationTracker;
  this.server = server;
  this.walsById = new HashMap<String, Map<String, SortedSet<String>>>();
  this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, Map<String, SortedSet<String>>>();
  this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
  this.conf = conf;
  this.fs = fs;
  this.logDir = logDir;
  this.oldLogDir = oldLogDir;
  this.sleepBeforeFailover =
      conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
  this.clusterId = clusterId;
  this.replicationTracker.registerListener(this);
  this.replicationPeers.getAllPeerIds();
  // It's preferable to failover 1 RS at a time, but with good zk servers
  // more could be processed at the same time.
  int nbWorkers = conf.getInt("replication.executor.workers", 1);
  // use a short 100ms sleep since this could be done inline with a RS startup
  // even if we fail, other region servers can take care of it
  this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
      100, TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue<Runnable>());
  ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
  tfb.setNameFormat("ReplicationExecutor-%d");
  tfb.setDaemon(true);
  this.executor.setThreadFactory(tfb.build());
  this.rand = new Random();
  this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
}
项目:ditb    文件:ReplicationSourceManager.java   
/**
 *
 * @param rsZnode
 */
public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final UUID clusterId) {
  super("Failover-for-"+rsZnode);
  this.rsZnode = rsZnode;
  this.rq = replicationQueues;
  this.rp = replicationPeers;
  this.clusterId = clusterId;
}
项目:ditb    文件:ReplicationSource.java   
/**
 * Instantiation method used by region servers
 *
 * @param conf configuration to use
 * @param fs file system to use
 * @param manager replication manager to ping to
 * @param stopper     the atomic boolean to use to stop the regionserver
 * @param peerClusterZnode the name of our znode
 * @param clusterId unique UUID for the cluster
 * @param replicationEndpoint the replication endpoint implementation
 * @param metrics metrics for replication source
 * @throws IOException
 */
@Override
public void init(final Configuration conf, final FileSystem fs,
    final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final Stoppable stopper,
    final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
    final MetricsSource metrics)
        throws IOException {
  this.stopper = stopper;
  this.conf = HBaseConfiguration.create(conf);
  decorateConf();
  this.replicationQueueSizeCapacity =
      this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
  this.replicationQueueNbCapacity =
      this.conf.getInt("replication.source.nb.capacity", 25000);
  this.sleepForRetries =
      this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
  this.maxRetriesMultiplier =
      this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
  this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
  long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
  this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
  this.replicationQueues = replicationQueues;
  this.replicationPeers = replicationPeers;
  this.manager = manager;
  this.fs = fs;
  this.metrics = metrics;
  this.clusterId = clusterId;

  this.peerClusterZnode = peerClusterZnode;
  this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
  // ReplicationQueueInfo parses the peerId out of the znode for us
  this.peerId = this.replicationQueueInfo.getPeerId();
  this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
  this.replicationEndpoint = replicationEndpoint;
}
项目:ditb    文件:TestReplicationSourceManager.java   
@Test
public void testCleanupFailoverQueues() throws Exception {
  final Server server = new DummyServer("hostname1.example.org");
  ReplicationQueues rq =
      ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
        server);
  rq.init(server.getServerName().toString());
  // populate some znodes in the peer znode
  SortedSet<String> files = new TreeSet<String>();
  String group = "testgroup";
  String file1 = group + ".log1";
  String file2 = group + ".log2";
  files.add(file1);
  files.add(file2);
  for (String file : files) {
    rq.addLog("1", file);
  }
  Server s1 = new DummyServer("dummyserver1.example.org");
  ReplicationQueues rq1 =
      ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
  rq1.init(s1.getServerName().toString());
  ReplicationPeers rp1 =
      ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
  rp1.init();
  NodeFailoverWorker w1 =
      manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID(
          new Long(1), new Long(2)));
  w1.start();
  w1.join(5000);
  assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
  String id = "1-" + server.getServerName().getServerName();
  assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
  manager.cleanOldLogs(file2, id, true);
  // log1 should be deleted
  assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
}
项目:ditb    文件:TestReplicationSinkManager.java   
@Before
public void setUp() {
  replicationPeers = mock(ReplicationPeers.class);
  replicationEndpoint = mock(HBaseReplicationEndpoint.class);
  sinkManager = new ReplicationSinkManager(mock(HConnection.class),
                    PEER_CLUSTER_ID, replicationEndpoint, new Configuration());
}
项目:pbase    文件:ReplicationSourceManager.java   
/**
 * Creates a replication manager and sets the watch on all the other registered region servers
 * @param replicationQueues the interface for manipulating replication queues
 * @param replicationPeers
 * @param replicationTracker
 * @param conf the configuration to use
 * @param server the server for this region server
 * @param fs the file system to use
 * @param logDir the directory that contains all wal directories of live RSs
 * @param oldLogDir the directory where old logs are archived
 * @param clusterId
 */
public ReplicationSourceManager(final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
    final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
    final Path oldLogDir, final UUID clusterId) {
  //CopyOnWriteArrayList is thread-safe.
  //Generally, reading is more than modifying.
  this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
  this.replicationQueues = replicationQueues;
  this.replicationPeers = replicationPeers;
  this.replicationTracker = replicationTracker;
  this.server = server;
  this.walsById = new HashMap<String, SortedSet<String>>();
  this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
  this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
  this.conf = conf;
  this.fs = fs;
  this.logDir = logDir;
  this.oldLogDir = oldLogDir;
  this.sleepBeforeFailover =
      conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
  this.clusterId = clusterId;
  this.replicationTracker.registerListener(this);
  this.replicationPeers.getAllPeerIds();
  // It's preferable to failover 1 RS at a time, but with good zk servers
  // more could be processed at the same time.
  int nbWorkers = conf.getInt("replication.executor.workers", 1);
  // use a short 100ms sleep since this could be done inline with a RS startup
  // even if we fail, other region servers can take care of it
  this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
      100, TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue<Runnable>());
  ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
  tfb.setNameFormat("ReplicationExecutor-%d");
  tfb.setDaemon(true);
  this.executor.setThreadFactory(tfb.build());
  this.rand = new Random();
}
项目:pbase    文件:ReplicationSourceManager.java   
/**
 *
 * @param rsZnode
 */
public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final UUID clusterId) {
  super("Failover-for-"+rsZnode);
  this.rsZnode = rsZnode;
  this.rq = replicationQueues;
  this.rp = replicationPeers;
  this.clusterId = clusterId;
}
项目:pbase    文件:VerifyReplication.java   
private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
  ZooKeeperWatcher localZKW = null;
  ReplicationPeerZKImpl peer = null;
  try {
    localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
        new Abortable() {
          @Override public void abort(String why, Throwable e) {}
          @Override public boolean isAborted() {return false;}
        });

    ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
    rp.init();

    Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
    if (pair == null) {
      throw new IOException("Couldn't get peer conf!");
    }
    Configuration peerConf = rp.getPeerConf(peerId).getSecond();
    return ZKUtil.getZooKeeperClusterKey(peerConf);
  } catch (ReplicationException e) {
    throw new IOException(
        "An error occured while trying to connect to the remove peer cluster", e);
  } finally {
    if (peer != null) {
      peer.close();
    }
    if (localZKW != null) {
      localZKW.close();
    }
  }
}
项目:pbase    文件:TestReplicationSourceManager.java   
@Test
public void testCleanupFailoverQueues() throws Exception {
  final Server server = new DummyServer("hostname1.example.org");
  ReplicationQueues rq =
      ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
        server);
  rq.init(server.getServerName().toString());
  // populate some znodes in the peer znode
  SortedSet<String> files = new TreeSet<String>();
  files.add("log1");
  files.add("log2");
  for (String file : files) {
    rq.addLog("1", file);
  }
  Server s1 = new DummyServer("dummyserver1.example.org");
  ReplicationQueues rq1 =
      ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
  rq1.init(s1.getServerName().toString());
  ReplicationPeers rp1 =
      ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
  rp1.init();
  NodeFailoverWorker w1 =
      manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID(
          new Long(1), new Long(2)));
  w1.start();
  w1.join(5000);
  assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
  String id = "1-" + server.getServerName().getServerName();
  assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id));
  manager.cleanOldLogs("log2", id, true);
  // log1 should be deleted
  assertEquals(Sets.newHashSet("log2"), manager.getWalsByIdRecoveredQueues().get(id));
}
项目:pbase    文件:TestReplicationSinkManager.java   
@Before
public void setUp() {
  replicationPeers = mock(ReplicationPeers.class);
  replicationEndpoint = mock(HBaseReplicationEndpoint.class);
  sinkManager = new ReplicationSinkManager(mock(HConnection.class),
                    PEER_CLUSTER_ID, replicationEndpoint, new Configuration());
}
项目:HIndex    文件:ReplicationSinkManager.java   
/**
 * Instantiate for a single replication peer cluster.
 * @param conn connection to the peer cluster
 * @param peerClusterId identifier of the peer cluster
 * @param replicationPeers manages peer clusters being replicated to
 * @param conf HBase configuration, used for determining replication source ratio and bad peer
 *          threshold
 */
public ReplicationSinkManager(HConnection conn, String peerClusterId,
    ReplicationPeers replicationPeers, Configuration conf) {
  this.conn = conn;
  this.peerClusterId = peerClusterId;
  this.replicationPeers = replicationPeers;
  this.badReportCounts = Maps.newHashMap();
  this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
  this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
                                      DEFAULT_BAD_SINK_THRESHOLD);
  this.random = new Random();
}
项目:HIndex    文件:ReplicationSourceManager.java   
/**
 * Creates a replication manager and sets the watch on all the other registered region servers
 * @param replicationQueues the interface for manipulating replication queues
 * @param replicationPeers
 * @param replicationTracker
 * @param conf the configuration to use
 * @param stopper the stopper object for this region server
 * @param fs the file system to use
 * @param logDir the directory that contains all hlog directories of live RSs
 * @param oldLogDir the directory where old logs are archived
 * @param clusterId
 */
public ReplicationSourceManager(final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
    final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir,
    final Path oldLogDir, final UUID clusterId) {
  this.sources = new ArrayList<ReplicationSourceInterface>();
  this.replicationQueues = replicationQueues;
  this.replicationPeers = replicationPeers;
  this.replicationTracker = replicationTracker;
  this.stopper = stopper;
  this.hlogsById = new HashMap<String, SortedSet<String>>();
  this.oldsources = new ArrayList<ReplicationSourceInterface>();
  this.conf = conf;
  this.fs = fs;
  this.logDir = logDir;
  this.oldLogDir = oldLogDir;
  this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
  this.clusterId = clusterId;
  this.replicationTracker.registerListener(this);
  this.replicationPeers.getAllPeerIds();
  // It's preferable to failover 1 RS at a time, but with good zk servers
  // more could be processed at the same time.
  int nbWorkers = conf.getInt("replication.executor.workers", 1);
  // use a short 100ms sleep since this could be done inline with a RS startup
  // even if we fail, other region servers can take care of it
  this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
      100, TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue<Runnable>());
  ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
  tfb.setNameFormat("ReplicationExecutor-%d");
  this.executor.setThreadFactory(tfb.build());
  this.rand = new Random();
}
项目:HIndex    文件:ReplicationSourceManager.java   
/**
 *
 * @param rsZnode
 */
public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final UUID clusterId) {
  super("Failover-for-"+rsZnode);
  this.rsZnode = rsZnode;
  this.rq = replicationQueues;
  this.rp = replicationPeers;
  this.clusterId = clusterId;
}
项目:HIndex    文件:VerifyReplication.java   
private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
  ZooKeeperWatcher localZKW = null;
  ReplicationPeer peer = null;
  try {
    localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
        new Abortable() {
          @Override public void abort(String why, Throwable e) {}
          @Override public boolean isAborted() {return false;}
        });

    ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
    rp.init();

    Configuration peerConf = rp.getPeerConf(peerId);
    if (peerConf == null) {
      throw new IOException("Couldn't get peer conf!");
    }

    return ZKUtil.getZooKeeperClusterKey(peerConf);
  } catch (ReplicationException e) {
    throw new IOException(
        "An error occured while trying to connect to the remove peer cluster", e);
  } finally {
    if (peer != null) {
      peer.close();
    }
    if (localZKW != null) {
      localZKW.close();
    }
  }
}
项目:hbase    文件:TestReplicationSourceManager.java   
@Test
public void testCleanupFailoverQueues() throws Exception {
  Server server = new DummyServer("hostname1.example.org");
  ReplicationQueueStorage rq = ReplicationStorageFactory
      .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
  // populate some znodes in the peer znode
  SortedSet<String> files = new TreeSet<>();
  String group = "testgroup";
  String file1 = group + ".log1";
  String file2 = group + ".log2";
  files.add(file1);
  files.add(file2);
  for (String file : files) {
    rq.addWAL(server.getServerName(), "1", file);
  }
  Server s1 = new DummyServer("dummyserver1.example.org");
  ReplicationPeers rp1 =
      ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration());
  rp1.init();
  NodeFailoverWorker w1 =
      manager.new NodeFailoverWorker(server.getServerName());
  w1.run();
  assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
  String id = "1-" + server.getServerName().getServerName();
  assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
  manager.cleanOldLogs(file2, id, true);
  // log1 should be deleted
  assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
}
项目:hbase    文件:TestReplicationSourceManager.java   
/**
 * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the
 * corresponding ReplicationSourceInterface correctly cleans up the corresponding
 * replication queue and ReplicationPeer.
 * See HBASE-16096.
 * @throws Exception
 */
@Test
public void testPeerRemovalCleanup() throws Exception{
  String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
  final String peerId = "FakePeer";
  final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
      .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase");
  try {
    DummyServer server = new DummyServer();
    ReplicationQueueStorage rq = ReplicationStorageFactory
        .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
    // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
    // initialization to throw an exception.
    conf.set("replication.replicationsource.implementation",
        FailInitializeDummyReplicationSource.class.getName());
    final ReplicationPeers rp = manager.getReplicationPeers();
    // Set up the znode and ReplicationPeer for the fake peer
    // Don't wait for replication source to initialize, we know it won't.
    addPeerAndWait(peerId, peerConfig, false);

    // Sanity check
    assertNull(manager.getSource(peerId));

    // Create a replication queue for the fake peer
    rq.addWAL(server.getServerName(), peerId, "FakeFile");
    // Unregister peer, this should remove the peer and clear all queues associated with it
    // Need to wait for the ReplicationTracker to pick up the changes and notify listeners.
    removePeerAndWait(peerId);
    assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId));
  } finally {
    conf.set("replication.replicationsource.implementation", replicationSourceImplName);
    removePeerAndWait(peerId);
  }
}
项目:hbase    文件:TestReplicationSinkManager.java   
@Before
public void setUp() {
  replicationPeers = mock(ReplicationPeers.class);
  replicationEndpoint = mock(HBaseReplicationEndpoint.class);
  sinkManager = new ReplicationSinkManager(mock(ClusterConnection.class),
                    PEER_CLUSTER_ID, replicationEndpoint, new Configuration());
}
项目:PyroDB    文件:ReplicationSinkManager.java   
/**
 * Instantiate for a single replication peer cluster.
 * @param conn connection to the peer cluster
 * @param peerClusterId identifier of the peer cluster
 * @param replicationPeers manages peer clusters being replicated to
 * @param conf HBase configuration, used for determining replication source ratio and bad peer
 *          threshold
 */
public ReplicationSinkManager(HConnection conn, String peerClusterId,
    ReplicationPeers replicationPeers, Configuration conf) {
  this.conn = conn;
  this.peerClusterId = peerClusterId;
  this.replicationPeers = replicationPeers;
  this.badReportCounts = Maps.newHashMap();
  this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
  this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
                                      DEFAULT_BAD_SINK_THRESHOLD);
  this.random = new Random();
}
项目:PyroDB    文件:ReplicationSourceManager.java   
/**
 * Creates a replication manager and sets the watch on all the other registered region servers
 * @param replicationQueues the interface for manipulating replication queues
 * @param replicationPeers
 * @param replicationTracker
 * @param conf the configuration to use
 * @param stopper the stopper object for this region server
 * @param fs the file system to use
 * @param logDir the directory that contains all hlog directories of live RSs
 * @param oldLogDir the directory where old logs are archived
 * @param clusterId
 */
public ReplicationSourceManager(final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
    final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir,
    final Path oldLogDir, final UUID clusterId) {
  this.sources = new ArrayList<ReplicationSourceInterface>();
  this.replicationQueues = replicationQueues;
  this.replicationPeers = replicationPeers;
  this.replicationTracker = replicationTracker;
  this.stopper = stopper;
  this.hlogsById = new HashMap<String, SortedSet<String>>();
  this.oldsources = new ArrayList<ReplicationSourceInterface>();
  this.conf = conf;
  this.fs = fs;
  this.logDir = logDir;
  this.oldLogDir = oldLogDir;
  this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
  this.clusterId = clusterId;
  this.replicationTracker.registerListener(this);
  this.replicationPeers.getAllPeerIds();
  // It's preferable to failover 1 RS at a time, but with good zk servers
  // more could be processed at the same time.
  int nbWorkers = conf.getInt("replication.executor.workers", 1);
  // use a short 100ms sleep since this could be done inline with a RS startup
  // even if we fail, other region servers can take care of it
  this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
      100, TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue<Runnable>());
  ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
  tfb.setNameFormat("ReplicationExecutor-%d");
  this.executor.setThreadFactory(tfb.build());
  this.rand = new Random();
}
项目:PyroDB    文件:ReplicationSourceManager.java   
/**
 *
 * @param rsZnode
 */
public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final UUID clusterId) {
  super("Failover-for-"+rsZnode);
  this.rsZnode = rsZnode;
  this.rq = replicationQueues;
  this.rp = replicationPeers;
  this.clusterId = clusterId;
}
项目:PyroDB    文件:VerifyReplication.java   
private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
  ZooKeeperWatcher localZKW = null;
  ReplicationPeer peer = null;
  try {
    localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
        new Abortable() {
          @Override public void abort(String why, Throwable e) {}
          @Override public boolean isAborted() {return false;}
        });

    ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
    rp.init();

    Configuration peerConf = rp.getPeerConf(peerId);
    if (peerConf == null) {
      throw new IOException("Couldn't get peer conf!");
    }

    return ZKUtil.getZooKeeperClusterKey(peerConf);
  } catch (ReplicationException e) {
    throw new IOException(
        "An error occured while trying to connect to the remove peer cluster", e);
  } finally {
    if (peer != null) {
      peer.close();
    }
    if (localZKW != null) {
      localZKW.close();
    }
  }
}
项目:c5    文件:ReplicationSinkManager.java   
/**
 * Instantiate for a single replication peer cluster.
 * @param conn connection to the peer cluster
 * @param peerClusterId identifier of the peer cluster
 * @param replicationPeers manages peer clusters being replicated to
 * @param conf HBase configuration, used for determining replication source ratio and bad peer
 *          threshold
 */
public ReplicationSinkManager(HConnection conn, String peerClusterId,
    ReplicationPeers replicationPeers, Configuration conf) {
  this.conn = conn;
  this.peerClusterId = peerClusterId;
  this.replicationPeers = replicationPeers;
  this.badReportCounts = Maps.newHashMap();
  this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
  this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
                                      DEFAULT_BAD_SINK_THRESHOLD);
  this.random = new Random();
}
项目:c5    文件:ReplicationSourceManager.java   
/**
 * Creates a replication manager and sets the watch on all the other registered region servers
 * @param replicationQueues the interface for manipulating replication queues
 * @param replicationPeers
 * @param replicationTracker
 * @param conf the configuration to use
 * @param stopper the stopper object for this region server
 * @param fs the file system to use
 * @param logDir the directory that contains all hlog directories of live RSs
 * @param oldLogDir the directory where old logs are archived
 * @param clusterId
 */
public ReplicationSourceManager(final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
    final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir,
    final Path oldLogDir, final UUID clusterId) {
  this.sources = new ArrayList<ReplicationSourceInterface>();
  this.replicationQueues = replicationQueues;
  this.replicationPeers = replicationPeers;
  this.replicationTracker = replicationTracker;
  this.stopper = stopper;
  this.hlogsById = new HashMap<String, SortedSet<String>>();
  this.oldsources = new ArrayList<ReplicationSourceInterface>();
  this.conf = conf;
  this.fs = fs;
  this.logDir = logDir;
  this.oldLogDir = oldLogDir;
  this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
  this.clusterId = clusterId;
  this.replicationTracker.registerListener(this);
  this.replicationPeers.getAllPeerIds();
  // It's preferable to failover 1 RS at a time, but with good zk servers
  // more could be processed at the same time.
  int nbWorkers = conf.getInt("replication.executor.workers", 1);
  // use a short 100ms sleep since this could be done inline with a RS startup
  // even if we fail, other region servers can take care of it
  this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
      100, TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue<Runnable>());
  ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
  tfb.setNameFormat("ReplicationExecutor-%d");
  this.executor.setThreadFactory(tfb.build());
  this.rand = new Random();
}
项目:c5    文件:ReplicationSourceManager.java   
/**
 *
 * @param rsZnode
 */
public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final UUID clusterId) {
  super("Failover-for-"+rsZnode);
  this.rsZnode = rsZnode;
  this.rq = replicationQueues;
  this.rp = replicationPeers;
  this.clusterId = clusterId;
}
项目:c5    文件:ReplicationSource.java   
/**
 * Instantiation method used by region servers
 *
 * @param conf configuration to use
 * @param fs file system to use
 * @param manager replication manager to ping to
 * @param stopper     the atomic boolean to use to stop the regionserver
 * @param peerClusterZnode the name of our znode
 * @throws IOException
 */
public void init(final Configuration conf, final FileSystem fs,
    final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final Stoppable stopper,
    final String peerClusterZnode, final UUID clusterId) throws IOException {
  this.stopper = stopper;
  this.conf = conf;
  this.replicationQueueSizeCapacity =
      this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
  this.replicationQueueNbCapacity =
      this.conf.getInt("replication.source.nb.capacity", 25000);
  this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
  this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
      maxRetriesMultiplier * maxRetriesMultiplier);
  this.queue =
      new PriorityBlockingQueue<Path>(
          conf.getInt("hbase.regionserver.maxlogs", 32),
          new LogsComparator());
  // TODO: This connection is replication specific or we should make it particular to
  // replication and make replication specific settings such as compression or codec to use
  // passing Cells.
  this.conn = HConnectionManager.getConnection(conf);
  this.replicationQueues = replicationQueues;
  this.replicationPeers = replicationPeers;
  this.manager = manager;
  this.sleepForRetries =
      this.conf.getLong("replication.source.sleepforretries", 1000);
  this.fs = fs;
  this.metrics = new MetricsSource(peerClusterZnode);
  this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
  this.clusterId = clusterId;

  this.peerClusterZnode = peerClusterZnode;
  this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
  // ReplicationQueueInfo parses the peerId out of the znode for us
  this.peerId = this.replicationQueueInfo.getPeerId();
  this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, conf);
  this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
}
项目:pbase    文件:ReplicationSource.java   
/**
 * Instantiation method used by region servers
 *
 * @param conf configuration to use
 * @param fs file system to use
 * @param manager replication manager to ping to
 * @param stopper     the atomic boolean to use to stop the regionserver
 * @param peerClusterZnode the name of our znode
 * @param clusterId unique UUID for the cluster
 * @param replicationEndpoint the replication endpoint implementation
 * @param metrics metrics for replication source
 * @throws IOException
 */
@Override
public void init(final Configuration conf, final FileSystem fs,
    final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final Stoppable stopper,
    final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
    final MetricsSource metrics)
        throws IOException {
  this.stopper = stopper;
  this.conf = HBaseConfiguration.create(conf);
  decorateConf();
  this.replicationQueueSizeCapacity =
      this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
  this.replicationQueueNbCapacity =
      this.conf.getInt("replication.source.nb.capacity", 25000);
  this.sleepForRetries =
      this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
  this.maxRetriesMultiplier =
      this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
  this.queue =
      new PriorityBlockingQueue<Path>(
          this.conf.getInt("hbase.regionserver.maxlogs", 32),
          new LogsComparator());
  long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
  this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
  this.replicationQueues = replicationQueues;
  this.replicationPeers = replicationPeers;
  this.manager = manager;
  this.fs = fs;
  this.metrics = metrics;
  this.repLogReader = new ReplicationWALReaderManager(this.fs, this.conf);
  this.clusterId = clusterId;

  this.peerClusterZnode = peerClusterZnode;
  this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
  // ReplicationQueueInfo parses the peerId out of the znode for us
  this.peerId = this.replicationQueueInfo.getPeerId();
  this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
  this.replicationEndpoint = replicationEndpoint;

  this.replicateContext = new ReplicationEndpoint.ReplicateContext();
}
项目:HIndex    文件:ReplicationSource.java   
/**
 * Instantiation method used by region servers
 *
 * @param conf configuration to use
 * @param fs file system to use
 * @param manager replication manager to ping to
 * @param stopper     the atomic boolean to use to stop the regionserver
 * @param peerClusterZnode the name of our znode
 * @throws IOException
 */
public void init(final Configuration conf, final FileSystem fs,
    final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final Stoppable stopper,
    final String peerClusterZnode, final UUID clusterId) throws IOException {
  this.stopper = stopper;
  this.conf = HBaseConfiguration.create(conf);
  decorateConf();
  this.replicationQueueSizeCapacity =
      this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
  this.replicationQueueNbCapacity =
      this.conf.getInt("replication.source.nb.capacity", 25000);
  this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
  this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
      maxRetriesMultiplier * maxRetriesMultiplier);
  this.queue =
      new PriorityBlockingQueue<Path>(
          this.conf.getInt("hbase.regionserver.maxlogs", 32),
          new LogsComparator());
  // TODO: This connection is replication specific or we should make it particular to
  // replication and make replication specific settings such as compression or codec to use
  // passing Cells.
  this.conn = HConnectionManager.getConnection(this.conf);
  long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
  this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
  this.replicationQueues = replicationQueues;
  this.replicationPeers = replicationPeers;
  this.manager = manager;
  this.sleepForRetries =
      this.conf.getLong("replication.source.sleepforretries", 1000);
  this.fs = fs;
  this.metrics = new MetricsSource(peerClusterZnode);
  this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
  this.clusterId = clusterId;

  this.peerClusterZnode = peerClusterZnode;
  this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
  // ReplicationQueueInfo parses the peerId out of the znode for us
  this.peerId = this.replicationQueueInfo.getPeerId();
  this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, this.conf);
  this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
}
项目:HIndex    文件:TestReplicationSinkManager.java   
@Before
public void setUp() {
  replicationPeers = mock(ReplicationPeers.class);
  sinkManager = new ReplicationSinkManager(mock(HConnection.class),
                    PEER_CLUSTER_ID, replicationPeers, new Configuration());
}
项目:hbase    文件:ReplicationSourceManager.java   
/**
 * Creates a replication manager and sets the watch on all the other registered region servers
 * @param queueStorage the interface for manipulating replication queues
 * @param replicationPeers
 * @param replicationTracker
 * @param conf the configuration to use
 * @param server the server for this region server
 * @param fs the file system to use
 * @param logDir the directory that contains all wal directories of live RSs
 * @param oldLogDir the directory where old logs are archived
 * @param clusterId
 */
public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
    ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
    Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
    WALFileLengthProvider walFileLengthProvider) throws IOException {
  // CopyOnWriteArrayList is thread-safe.
  // Generally, reading is more than modifying.
  this.sources = new ConcurrentHashMap<>();
  this.queueStorage = queueStorage;
  this.replicationPeers = replicationPeers;
  this.replicationTracker = replicationTracker;
  this.server = server;
  this.walsById = new ConcurrentHashMap<>();
  this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
  this.oldsources = new ArrayList<>();
  this.conf = conf;
  this.fs = fs;
  this.logDir = logDir;
  this.oldLogDir = oldLogDir;
  this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30
                                                                                       // seconds
  this.clusterId = clusterId;
  this.walFileLengthProvider = walFileLengthProvider;
  this.replicationTracker.registerListener(this);
  // It's preferable to failover 1 RS at a time, but with good zk servers
  // more could be processed at the same time.
  int nbWorkers = conf.getInt("replication.executor.workers", 1);
  // use a short 100ms sleep since this could be done inline with a RS startup
  // even if we fail, other region servers can take care of it
  this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue<>());
  ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
  tfb.setNameFormat("ReplicationExecutor-%d");
  tfb.setDaemon(true);
  this.executor.setThreadFactory(tfb.build());
  this.latestPaths = new HashSet<Path>();
  replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
    HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
  this.replicationWaitTime = conf.getLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY,
    HConstants.REPLICATION_SERIALLY_WAITING_DEFAULT);
  connection = ConnectionFactory.createConnection(conf);
}
项目:PyroDB    文件:ReplicationSource.java   
/**
 * Instantiation method used by region servers
 *
 * @param conf configuration to use
 * @param fs file system to use
 * @param manager replication manager to ping to
 * @param stopper     the atomic boolean to use to stop the regionserver
 * @param peerClusterZnode the name of our znode
 * @throws IOException
 */
public void init(final Configuration conf, final FileSystem fs,
    final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final Stoppable stopper,
    final String peerClusterZnode, final UUID clusterId) throws IOException {
  this.stopper = stopper;
  this.conf = HBaseConfiguration.create(conf);
  decorateConf();
  this.replicationQueueSizeCapacity =
      this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
  this.replicationQueueNbCapacity =
      this.conf.getInt("replication.source.nb.capacity", 25000);
  this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
  this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
      maxRetriesMultiplier * maxRetriesMultiplier);
  this.queue =
      new PriorityBlockingQueue<Path>(
          this.conf.getInt("hbase.regionserver.maxlogs", 32),
          new LogsComparator());
  // TODO: This connection is replication specific or we should make it particular to
  // replication and make replication specific settings such as compression or codec to use
  // passing Cells.
  this.conn = HConnectionManager.getConnection(this.conf);
  long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
  this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
  this.replicationQueues = replicationQueues;
  this.replicationPeers = replicationPeers;
  this.manager = manager;
  this.sleepForRetries =
      this.conf.getLong("replication.source.sleepforretries", 1000);
  this.fs = fs;
  this.metrics = new MetricsSource(peerClusterZnode);
  this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
  this.clusterId = clusterId;

  this.peerClusterZnode = peerClusterZnode;
  this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
  // ReplicationQueueInfo parses the peerId out of the znode for us
  this.peerId = this.replicationQueueInfo.getPeerId();
  this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, this.conf);
  this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
}
项目:PyroDB    文件:TestReplicationSinkManager.java   
@Before
public void setUp() {
  replicationPeers = mock(ReplicationPeers.class);
  sinkManager = new ReplicationSinkManager(mock(HConnection.class),
                    PEER_CLUSTER_ID, replicationPeers, new Configuration());
}
项目:c5    文件:TestReplicationSinkManager.java   
@Before
public void setUp() {
  replicationPeers = mock(ReplicationPeers.class);
  sinkManager = new ReplicationSinkManager(mock(HConnection.class),
                    PEER_CLUSTER_ID, replicationPeers, new Configuration());
}
项目:ditb    文件:ReplicationSourceInterface.java   
/**
 * Initializer for the source
 * @param conf the configuration to use
 * @param fs the file system to use
 * @param manager the manager to use
 * @param replicationQueues
 * @param replicationPeers
 * @param stopper the stopper object for this region server
 * @param peerClusterZnode
 * @param clusterId
 * @throws IOException
 */
public void init(final Configuration conf, final FileSystem fs,
    final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final Stoppable stopper,
    final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
    final MetricsSource metrics) throws IOException;
项目:pbase    文件:ReplicationSourceInterface.java   
/**
 * Initializer for the source
 * @param conf the configuration to use
 * @param fs the file system to use
 * @param manager the manager to use
 * @param replicationQueues
 * @param replicationPeers
 * @param stopper the stopper object for this region server
 * @param peerClusterZnode
 * @param clusterId
 * @throws IOException
 */
public void init(final Configuration conf, final FileSystem fs,
    final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
    final ReplicationPeers replicationPeers, final Stoppable stopper,
    final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
    final MetricsSource metrics) throws IOException;