Java 类org.apache.hadoop.hbase.replication.ReplicationPeer 实例源码

项目:ditb    文件:TestReplicationAdmin.java   
/**
 * Tests that the peer configuration used by ReplicationAdmin contains all
 * the peer's properties.
 */
@Test
public void testPeerConfig() throws Exception {
  ReplicationPeerConfig config = new ReplicationPeerConfig();
  config.setClusterKey(KEY_ONE);
  config.getConfiguration().put("key1", "value1");
  config.getConfiguration().put("key2", "value2");
  admin.addPeer(ID_ONE, config, null);

  List<ReplicationPeer> peers = admin.listReplicationPeers();
  assertEquals(1, peers.size());
  ReplicationPeer peerOne = peers.get(0);
  assertNotNull(peerOne);
  assertEquals("value1", peerOne.getConfiguration().get("key1"));
  assertEquals("value2", peerOne.getConfiguration().get("key2"));

  admin.removePeer(ID_ONE);
}
项目:ditb    文件:ReplicationAdmin.java   
@VisibleForTesting
List<ReplicationPeer> listReplicationPeers() {
  Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
  if (peers == null || peers.size() <= 0) {
    return null;
  }
  List<ReplicationPeer> listOfPeers = new ArrayList<ReplicationPeer>(peers.size());
  for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
    String peerId = peerEntry.getKey();
    try {
      Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
      Configuration peerConf = pair.getSecond();
      ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(),
          parseTableCFsFromConfig(this.getPeerTableCFs(peerId)));
      listOfPeers.add(peer);
    } catch (ReplicationException e) {
      LOG.warn("Failed to get valid replication peers. "
          + "Error connecting to peer cluster with peerId=" + peerId + ". Error message="
          + e.getMessage());
      LOG.debug("Failure details to get valid replication peers.", e);
      continue;
    }
  }
  return listOfPeers;
}
项目:ditb    文件:ReplicationSourceManager.java   
/**
 * Add sources for the given peer cluster on this region server. For the newly added peer, we only
 * need to enqueue the latest log of each wal group and do replication
 * @param id the id of the peer cluster
 * @return the source that was created
 * @throws IOException
 */
protected ReplicationSourceInterface addSource(String id) throws IOException,
    ReplicationException {
  ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
  ReplicationPeer peer = replicationPeers.getPeer(id);
  ReplicationSourceInterface src =
      getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
        this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
  synchronized (this.walsById) {
    this.sources.add(src);
    Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
    this.walsById.put(id, walsByGroup);
    // Add the latest wal to that source's queue
    synchronized (latestPaths) {
      if (this.latestPaths.size() > 0) {
        for (Path logPath : latestPaths) {
          String name = logPath.getName();
          String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name);
          SortedSet<String> logs = new TreeSet<String>();
          logs.add(name);
          walsByGroup.put(walPrefix, logs);
          try {
            this.replicationQueues.addLog(id, name);
          } catch (ReplicationException e) {
            String message =
                "Cannot add log to queue when creating a new source, queueId=" + id
                    + ", filename=" + name;
            server.stop(message);
            throw e;
          }
          src.enqueueLog(logPath);
        }
      }
    }
  }
  src.startup();
  return src;
}
项目:pbase    文件:ReplicationSourceManager.java   
/**
 * Add a new normal source to this region server
 * @param id the id of the peer cluster
 * @return the source that was created
 * @throws IOException
 */
protected ReplicationSourceInterface addSource(String id) throws IOException,
    ReplicationException {
  ReplicationPeerConfig peerConfig
    = replicationPeers.getReplicationPeerConfig(id);
  ReplicationPeer peer = replicationPeers.getPeer(id);
  ReplicationSourceInterface src =
      getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
        this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
  synchronized (this.walsById) {
    this.sources.add(src);
    this.walsById.put(id, new TreeSet<String>());
    // Add the latest wal to that source's queue
    if (this.latestPath != null) {
      String name = this.latestPath.getName();
      this.walsById.get(id).add(name);
      try {
        this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
      } catch (ReplicationException e) {
        String message =
            "Cannot add log to queue when creating a new source, queueId="
                + src.getPeerClusterZnode() + ", filename=" + name;
        server.stop(message);
        throw e;
      }
      src.enqueueLog(this.latestPath);
    }
  }
  src.startup();
  return src;
}
项目:HIndex    文件:VerifyReplication.java   
private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
  ZooKeeperWatcher localZKW = null;
  ReplicationPeer peer = null;
  try {
    localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
        new Abortable() {
          @Override public void abort(String why, Throwable e) {}
          @Override public boolean isAborted() {return false;}
        });

    ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
    rp.init();

    Configuration peerConf = rp.getPeerConf(peerId);
    if (peerConf == null) {
      throw new IOException("Couldn't get peer conf!");
    }

    return ZKUtil.getZooKeeperClusterKey(peerConf);
  } catch (ReplicationException e) {
    throw new IOException(
        "An error occured while trying to connect to the remove peer cluster", e);
  } finally {
    if (peer != null) {
      peer.close();
    }
    if (localZKW != null) {
      localZKW.close();
    }
  }
}
项目:hbase    文件:ReplicationSourceManager.java   
/**
 * Factory method to create a replication source
 * @param queueId the id of the replication queue
 * @return the created source
 */
private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer)
    throws IOException {
  ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);

  MetricsSource metrics = new MetricsSource(queueId);
  // init replication source
  src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
    walFileLengthProvider, metrics);
  return src;
}
项目:hbase    文件:ReplicationSourceManager.java   
/**
 * Add a normal source for the given peer on this region server. Meanwhile, add new replication
 * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal
 * group and do replication
 * @param peerId the id of the replication peer
 * @return the source that was created
 */
@VisibleForTesting
ReplicationSourceInterface addSource(String peerId) throws IOException {
  ReplicationPeer peer = replicationPeers.getPeer(peerId);
  ReplicationSourceInterface src = createSource(peerId, peer);
  // synchronized on latestPaths to avoid missing the new log
  synchronized (this.latestPaths) {
    this.sources.put(peerId, src);
    Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
    this.walsById.put(peerId, walsByGroup);
    // Add the latest wal to that source's queue
    if (this.latestPaths.size() > 0) {
      for (Path logPath : latestPaths) {
        String name = logPath.getName();
        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
        SortedSet<String> logs = new TreeSet<>();
        logs.add(name);
        walsByGroup.put(walPrefix, logs);
        // Abort RS and throw exception to make add peer failed
        abortAndThrowIOExceptionWhenFail(
          () -> this.queueStorage.addWAL(server.getServerName(), peerId, name));
        src.enqueueLog(logPath);
      }
    }
  }
  src.startup();
  return src;
}
项目:hbase    文件:RecoveredReplicationSource.java   
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
    String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
    MetricsSource metrics) throws IOException {
  super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode,
    clusterId, walFileLengthProvider, metrics);
  this.actualPeerId = this.replicationQueueInfo.getPeerId();
}
项目:hbase    文件:ReplicationSource.java   
/**
 * Instantiation method used by region servers
 * @param conf configuration to use
 * @param fs file system to use
 * @param manager replication manager to ping to
 * @param server the server for this region server
 * @param queueId the id of our replication queue
 * @param clusterId unique UUID for the cluster
 * @param metrics metrics for replication source
 */
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
    String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
    MetricsSource metrics) throws IOException {
  this.server = server;
  this.conf = HBaseConfiguration.create(conf);
  this.waitOnEndpointSeconds =
    this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
  decorateConf();
  this.sleepForRetries =
      this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
  this.maxRetriesMultiplier =
      this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
  this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
  this.queueStorage = queueStorage;
  this.replicationPeer = replicationPeer;
  this.manager = manager;
  this.fs = fs;
  this.metrics = metrics;
  this.clusterId = clusterId;

  this.queueId = queueId;
  this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
  // ReplicationQueueInfo parses the peerId out of the znode for us
  this.peerId = this.replicationQueueInfo.getPeerId();
  this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);

  defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
  currentBandwidth = getCurrentBandwidth();
  this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
  this.totalBufferUsed = manager.getTotalBufferUsed();
  this.walFileLengthProvider = walFileLengthProvider;
  LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId
      + ", currentBandwidth=" + this.currentBandwidth);
}
项目:hbase    文件:TestReplicationSourceManager.java   
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
    ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
    UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
    throws IOException {
  throw new IOException("Failing deliberately");
}
项目:PyroDB    文件:VerifyReplication.java   
private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
  ZooKeeperWatcher localZKW = null;
  ReplicationPeer peer = null;
  try {
    localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
        new Abortable() {
          @Override public void abort(String why, Throwable e) {}
          @Override public boolean isAborted() {return false;}
        });

    ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
    rp.init();

    Configuration peerConf = rp.getPeerConf(peerId);
    if (peerConf == null) {
      throw new IOException("Couldn't get peer conf!");
    }

    return ZKUtil.getZooKeeperClusterKey(peerConf);
  } catch (ReplicationException e) {
    throw new IOException(
        "An error occured while trying to connect to the remove peer cluster", e);
  } finally {
    if (peer != null) {
      peer.close();
    }
    if (localZKW != null) {
      localZKW.close();
    }
  }
}
项目:ditb    文件:ReplicationAdmin.java   
/**
 * Connect to peer and check the table descriptor on peer:
 * <ol>
 * <li>Create the same table on peer when not exist.</li>
 * <li>Throw exception if the table exists on peer cluster but descriptors are not same.</li>
 * </ol>
 * @param tableName name of the table to sync to the peer
 * @param splits table split keys
 * @throws IOException
 */
private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
    throws IOException {
  List<ReplicationPeer> repPeers = listReplicationPeers();
  if (repPeers == null || repPeers.size() <= 0) {
    throw new IllegalArgumentException("Found no peer cluster for replication.");
  }

  final TableName onlyTableNameQualifier = TableName.valueOf(tableName.getQualifierAsString());

  for (ReplicationPeer repPeer : repPeers) {
    Map<TableName, List<String>> tableCFMap = repPeer.getTableCFs();
    // TODO Currently peer TableCFs will not include namespace so we need to check only for table
    // name without namespace in it. Need to correct this logic once we fix HBASE-11386.
    if (tableCFMap != null && !tableCFMap.containsKey(onlyTableNameQualifier)) {
      continue;
    }

    Configuration peerConf = repPeer.getConfiguration();
    HTableDescriptor htd = null;
    try (Connection conn = ConnectionFactory.createConnection(peerConf);
        Admin admin = this.connection.getAdmin();
        Admin repHBaseAdmin = conn.getAdmin()) {
      htd = admin.getTableDescriptor(tableName);
      HTableDescriptor peerHtd = null;
      if (!repHBaseAdmin.tableExists(tableName)) {
        repHBaseAdmin.createTable(htd, splits);
      } else {
        peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
        if (peerHtd == null) {
          throw new IllegalArgumentException("Failed to get table descriptor for table "
              + tableName.getNameAsString() + " from peer cluster " + repPeer.getId());
        } else if (!peerHtd.equals(htd)) {
          throw new IllegalArgumentException("Table " + tableName.getNameAsString()
              + " exists in peer cluster " + repPeer.getId()
              + ", but the table descriptors are not same when comapred with source cluster."
              + " Thus can not enable the table's replication switch.");
        }
      }
    }
  }
}
项目:LCIndex-HBase-0.94.16    文件:VerifyReplication.java   
/**
 * Sets up the actual job.
 *
 * @param conf  The current configuration.
 * @param args  The command line parameters.
 * @return The newly created job.
 * @throws java.io.IOException When setting up the job fails.
 */
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
  if (!doCommandLine(args)) {
    return null;
  }
  if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
    throw new IOException("Replication needs to be enabled to verify it.");
  }
  HConnectionManager.execute(new HConnectable<Void>(conf) {
    @Override
    public Void connect(HConnection conn) throws IOException {
      try {
        ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
            conn.getZooKeeperWatcher());
        // Just verifying it we can connect
        ReplicationPeer peer = zk.getPeer(peerId);
        if (peer == null) {
          throw new IOException("Couldn't get access to the slave cluster," +
              "please see the log");
        }
      } catch (KeeperException ex) {
        throw new IOException("Couldn't get access to the slave cluster" +
            " because: ", ex);
      }
      return null;
    }
  });
  conf.set(NAME+".peerId", peerId);
  conf.set(NAME+".tableName", tableName);
  conf.setLong(NAME+".startTime", startTime);
  conf.setLong(NAME+".endTime", endTime);
  if (families != null) {
    conf.set(NAME+".families", families);
  }
  Job job = new Job(conf, NAME + "_" + tableName);
  job.setJarByClass(VerifyReplication.class);

  Scan scan = new Scan();
  if (startTime != 0) {
    scan.setTimeRange(startTime,
        endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
  }
  if(families != null) {
    String[] fams = families.split(",");
    for(String fam : fams) {
      scan.addFamily(Bytes.toBytes(fam));
    }
  }
  TableMapReduceUtil.initTableMapperJob(tableName, scan,
      Verifier.class, null, null, job);
  job.setOutputFormatClass(NullOutputFormat.class);
  job.setNumReduceTasks(0);
  return job;
}
项目:IRIndex    文件:VerifyReplication.java   
/**
 * Sets up the actual job.
 *
 * @param conf  The current configuration.
 * @param args  The command line parameters.
 * @return The newly created job.
 * @throws java.io.IOException When setting up the job fails.
 */
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
  if (!doCommandLine(args)) {
    return null;
  }
  if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
    throw new IOException("Replication needs to be enabled to verify it.");
  }
  HConnectionManager.execute(new HConnectable<Void>(conf) {
    @Override
    public Void connect(HConnection conn) throws IOException {
      try {
        ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
            conn.getZooKeeperWatcher());
        // Just verifying it we can connect
        ReplicationPeer peer = zk.getPeer(peerId);
        if (peer == null) {
          throw new IOException("Couldn't get access to the slave cluster," +
              "please see the log");
        }
      } catch (KeeperException ex) {
        throw new IOException("Couldn't get access to the slave cluster" +
            " because: ", ex);
      }
      return null;
    }
  });
  conf.set(NAME+".peerId", peerId);
  conf.set(NAME+".tableName", tableName);
  conf.setLong(NAME+".startTime", startTime);
  conf.setLong(NAME+".endTime", endTime);
  if (families != null) {
    conf.set(NAME+".families", families);
  }
  Job job = new Job(conf, NAME + "_" + tableName);
  job.setJarByClass(VerifyReplication.class);

  Scan scan = new Scan();
  if (startTime != 0) {
    scan.setTimeRange(startTime,
        endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
  }
  if(families != null) {
    String[] fams = families.split(",");
    for(String fam : fams) {
      scan.addFamily(Bytes.toBytes(fam));
    }
  }
  TableMapReduceUtil.initTableMapperJob(tableName, scan,
      Verifier.class, null, null, job);
  job.setOutputFormatClass(NullOutputFormat.class);
  job.setNumReduceTasks(0);
  return job;
}
项目:RStore    文件:VerifyReplication.java   
/**
 * Sets up the actual job.
 *
 * @param conf  The current configuration.
 * @param args  The command line parameters.
 * @return The newly created job.
 * @throws java.io.IOException When setting up the job fails.
 */
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
  if (!doCommandLine(args)) {
    return null;
  }
  if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
    throw new IOException("Replication needs to be enabled to verify it.");
  }
  HConnectionManager.execute(new HConnectable<Void>(conf) {
    @Override
    public Void connect(HConnection conn) throws IOException {
      try {
        ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
            conn.getZooKeeperWatcher());
        // Just verifying it we can connect
        ReplicationPeer peer = zk.getPeer(peerId);
        if (peer == null) {
          throw new IOException("Couldn't get access to the slave cluster," +
              "please see the log");
        }
      } catch (KeeperException ex) {
        throw new IOException("Couldn't get access to the slave cluster" +
            " because: ", ex);
      }
      return null;
    }
  });
  conf.set(NAME+".peerId", peerId);
  conf.set(NAME+".tableName", tableName);
  conf.setLong(NAME+".startTime", startTime);
  conf.setLong(NAME+".endTime", endTime);
  if (families != null) {
    conf.set(NAME+".families", families);
  }
  Job job = new Job(conf, NAME + "_" + tableName);
  job.setJarByClass(VerifyReplication.class);

  Scan scan = new Scan();
  if (startTime != 0) {
    scan.setTimeRange(startTime,
        endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
  }
  if(families != null) {
    String[] fams = families.split(",");
    for(String fam : fams) {
      scan.addFamily(Bytes.toBytes(fam));
    }
  }
  TableMapReduceUtil.initTableMapperJob(tableName, scan,
      Verifier.class, null, null, job);
  job.setOutputFormatClass(NullOutputFormat.class);
  job.setNumReduceTasks(0);
  return job;
}
项目:HBase-Research    文件:VerifyReplication.java   
/**
 * Sets up the actual job.
 *
 * @param conf  The current configuration.
 * @param args  The command line parameters.
 * @return The newly created job.
 * @throws java.io.IOException When setting up the job fails.
 */
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
  if (!doCommandLine(args)) {
    return null;
  }
  if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
    throw new IOException("Replication needs to be enabled to verify it.");
  }
  HConnectionManager.execute(new HConnectable<Void>(conf) {
    @Override
    public Void connect(HConnection conn) throws IOException {
      try {
        ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
            conn.getZooKeeperWatcher());
        // Just verifying it we can connect
        ReplicationPeer peer = zk.getPeer(peerId);
        if (peer == null) {
          throw new IOException("Couldn't get access to the slave cluster," +
              "please see the log");
        }
      } catch (KeeperException ex) {
        throw new IOException("Couldn't get access to the slave cluster" +
            " because: ", ex);
      }
      return null;
    }
  });
  conf.set(NAME+".peerId", peerId);
  conf.set(NAME+".tableName", tableName);
  conf.setLong(NAME+".startTime", startTime);
  conf.setLong(NAME+".endTime", endTime);
  if (families != null) {
    conf.set(NAME+".families", families);
  }
  Job job = new Job(conf, NAME + "_" + tableName);
  job.setJarByClass(VerifyReplication.class);

  Scan scan = new Scan();
  if (startTime != 0) {
    scan.setTimeRange(startTime,
        endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
  }
  if(families != null) {
    String[] fams = families.split(",");
    for(String fam : fams) {
      scan.addFamily(Bytes.toBytes(fam));
    }
  }
  TableMapReduceUtil.initTableMapperJob(tableName, scan,
      Verifier.class, null, null, job);
  job.setOutputFormatClass(NullOutputFormat.class);
  job.setNumReduceTasks(0);
  return job;
}
项目:hbase-0.94.8-qod    文件:VerifyReplication.java   
/**
 * Sets up the actual job.
 *
 * @param conf  The current configuration.
 * @param args  The command line parameters.
 * @return The newly created job.
 * @throws java.io.IOException When setting up the job fails.
 */
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
  if (!doCommandLine(args)) {
    return null;
  }
  if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
    throw new IOException("Replication needs to be enabled to verify it.");
  }
  HConnectionManager.execute(new HConnectable<Void>(conf) {
    @Override
    public Void connect(HConnection conn) throws IOException {
      try {
        ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
            conn.getZooKeeperWatcher());
        // Just verifying it we can connect
        ReplicationPeer peer = zk.getPeer(peerId);
        if (peer == null) {
          throw new IOException("Couldn't get access to the slave cluster," +
              "please see the log");
        }
      } catch (KeeperException ex) {
        throw new IOException("Couldn't get access to the slave cluster" +
            " because: ", ex);
      }
      return null;
    }
  });
  conf.set(NAME+".peerId", peerId);
  conf.set(NAME+".tableName", tableName);
  conf.setLong(NAME+".startTime", startTime);
  conf.setLong(NAME+".endTime", endTime);
  if (families != null) {
    conf.set(NAME+".families", families);
  }
  Job job = new Job(conf, NAME + "_" + tableName);
  job.setJarByClass(VerifyReplication.class);

  Scan scan = new Scan();
  if (startTime != 0) {
    scan.setTimeRange(startTime,
        endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
  }
  if(families != null) {
    String[] fams = families.split(",");
    for(String fam : fams) {
      scan.addFamily(Bytes.toBytes(fam));
    }
  }
  TableMapReduceUtil.initTableMapperJob(tableName, scan,
      Verifier.class, null, null, job);
  job.setOutputFormatClass(NullOutputFormat.class);
  job.setNumReduceTasks(0);
  return job;
}
项目:hbase-0.94.8-qod    文件:VerifyReplication.java   
/**
 * Sets up the actual job.
 *
 * @param conf  The current configuration.
 * @param args  The command line parameters.
 * @return The newly created job.
 * @throws java.io.IOException When setting up the job fails.
 */
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
  if (!doCommandLine(args)) {
    return null;
  }
  if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
    throw new IOException("Replication needs to be enabled to verify it.");
  }
  HConnectionManager.execute(new HConnectable<Void>(conf) {
    @Override
    public Void connect(HConnection conn) throws IOException {
      try {
        ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
            conn.getZooKeeperWatcher());
        // Just verifying it we can connect
        ReplicationPeer peer = zk.getPeer(peerId);
        if (peer == null) {
          throw new IOException("Couldn't get access to the slave cluster," +
              "please see the log");
        }
      } catch (KeeperException ex) {
        throw new IOException("Couldn't get access to the slave cluster" +
            " because: ", ex);
      }
      return null;
    }
  });
  conf.set(NAME+".peerId", peerId);
  conf.set(NAME+".tableName", tableName);
  conf.setLong(NAME+".startTime", startTime);
  conf.setLong(NAME+".endTime", endTime);
  if (families != null) {
    conf.set(NAME+".families", families);
  }
  Job job = new Job(conf, NAME + "_" + tableName);
  job.setJarByClass(VerifyReplication.class);

  Scan scan = new Scan();
  if (startTime != 0) {
    scan.setTimeRange(startTime,
        endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
  }
  if(families != null) {
    String[] fams = families.split(",");
    for(String fam : fams) {
      scan.addFamily(Bytes.toBytes(fam));
    }
  }
  TableMapReduceUtil.initTableMapperJob(tableName, scan,
      Verifier.class, null, null, job);
  job.setOutputFormatClass(NullOutputFormat.class);
  job.setNumReduceTasks(0);
  return job;
}
项目:hindex    文件:VerifyReplication.java   
/**
 * Sets up the actual job.
 *
 * @param conf  The current configuration.
 * @param args  The command line parameters.
 * @return The newly created job.
 * @throws java.io.IOException When setting up the job fails.
 */
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
  if (!doCommandLine(args)) {
    return null;
  }
  if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
    throw new IOException("Replication needs to be enabled to verify it.");
  }
  HConnectionManager.execute(new HConnectable<Void>(conf) {
    @Override
    public Void connect(HConnection conn) throws IOException {
      try {
        ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
            conn.getZooKeeperWatcher());
        // Just verifying it we can connect
        ReplicationPeer peer = zk.getPeer(peerId);
        if (peer == null) {
          throw new IOException("Couldn't get access to the slave cluster," +
              "please see the log");
        }
      } catch (KeeperException ex) {
        throw new IOException("Couldn't get access to the slave cluster" +
            " because: ", ex);
      }
      return null;
    }
  });
  conf.set(NAME+".peerId", peerId);
  conf.set(NAME+".tableName", tableName);
  conf.setLong(NAME+".startTime", startTime);
  conf.setLong(NAME+".endTime", endTime);
  if (families != null) {
    conf.set(NAME+".families", families);
  }
  Job job = new Job(conf, NAME + "_" + tableName);
  job.setJarByClass(VerifyReplication.class);

  Scan scan = new Scan();
  if (startTime != 0) {
    scan.setTimeRange(startTime,
        endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
  }
  if(families != null) {
    String[] fams = families.split(",");
    for(String fam : fams) {
      scan.addFamily(Bytes.toBytes(fam));
    }
  }
  TableMapReduceUtil.initTableMapperJob(tableName, scan,
      Verifier.class, null, null, job);
  job.setOutputFormatClass(NullOutputFormat.class);
  job.setNumReduceTasks(0);
  return job;
}
项目:hbase    文件:ReplicationSourceInterface.java   
/**
 * Initializer for the source
 * @param conf the configuration to use
 * @param fs the file system to use
 * @param manager the manager to use
 * @param server the server for this region server
 */
void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
    String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
    MetricsSource metrics) throws IOException;