Java 类org.apache.hadoop.hbase.replication.ChainWALEntryFilter 实例源码

项目:ditb    文件:RegionReplicaReplicationEndpoint.java   
@Override
public WALEntryFilter getWALEntryfilter() {
  WALEntryFilter superFilter = super.getWALEntryfilter();
  WALEntryFilter skipReplayedEditsFilter = getSkipReplayedEditsFilter();

  if (superFilter == null) {
    return skipReplayedEditsFilter;
  }

  if (skipReplayedEditsFilter == null) {
    return superFilter;
  }

  ArrayList<WALEntryFilter> filters = Lists.newArrayList();
  filters.add(superFilter);
  filters.add(skipReplayedEditsFilter);
  return new ChainWALEntryFilter(filters);
}
项目:hbase    文件:ReplicationSource.java   
private void initializeWALEntryFilter() {
  // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
  ArrayList<WALEntryFilter> filters =
    Lists.<WALEntryFilter> newArrayList(new SystemTableWALEntryFilter());
  WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
  if (filterFromEndpoint != null) {
    filters.add(filterFromEndpoint);
  }
  filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
  this.walEntryFilter = new ChainWALEntryFilter(filters);
}
项目:ditb    文件:ReplicationSource.java   
@Override
public void run() {
  // mark we are running now
  this.sourceRunning = true;
  try {
    // start the endpoint, connect to the cluster
    Service.State state = replicationEndpoint.start().get();
    if (state != Service.State.RUNNING) {
      LOG.warn("ReplicationEndpoint was not started. Exiting");
      uninitialize();
      return;
    }
  } catch (Exception ex) {
    LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
    throw new RuntimeException(ex);
  }

  // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
  ArrayList<WALEntryFilter> filters = Lists.newArrayList(
    (WALEntryFilter)new SystemTableWALEntryFilter());
  WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
  if (filterFromEndpoint != null) {
    filters.add(filterFromEndpoint);
  }
  this.walEntryFilter = new ChainWALEntryFilter(filters);

  int sleepMultiplier = 1;
  // delay this until we are in an asynchronous thread
  while (this.isSourceActive() && this.peerClusterId == null) {
    this.peerClusterId = replicationEndpoint.getPeerUUID();
    if (this.isSourceActive() && this.peerClusterId == null) {
      if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
        sleepMultiplier++;
      }
    }
  }

  // In rare case, zookeeper setting may be messed up. That leads to the incorrect
  // peerClusterId value, which is the same as the source clusterId
  if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
    this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
        + peerClusterId + " which is not allowed by ReplicationEndpoint:"
        + replicationEndpoint.getClass().getName(), null, false);
  }
  LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
  // start workers
  for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
    String walGroupId = entry.getKey();
    PriorityBlockingQueue<Path> queue = entry.getValue();
    final ReplicationSourceWorkerThread worker =
        new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this);
    ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker);
    if (extant != null) {
      LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
    } else {
      LOG.debug("Starting up worker for wal group " + walGroupId);
      worker.startup();
    }
  }
}