Java 类org.apache.hadoop.hbase.util.MultiThreadedWriter 实例源码

项目:ditb    文件:StripeCompactionsPerformanceEvaluation.java   
private void runOneTest(String description, Configuration conf) throws Exception {
  int numServers = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
  long startKey = (long)preloadKeys * numServers;
  long endKey = startKey + (long)writeKeys * numServers;
  status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d",
      description, numServers, startKey, endKey));

  if (preloadKeys > 0) {
    MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
    long time = System.currentTimeMillis();
    preloader.start(0, startKey, writeThreads);
    preloader.waitForFinish();
    if (preloader.getNumWriteFailures() > 0) {
      throw new IOException("Preload failed");
    }
    int waitTime = (int)Math.min(preloadKeys / 100, 30000); // arbitrary
    status(description + " preload took " + (System.currentTimeMillis()-time)/1000
        + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize");
    Thread.sleep(waitTime);
  }

  MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
  MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100);
  // reader.getMetrics().enable();
  reader.linkToWriter(writer);

  long testStartTime = System.currentTimeMillis();
  writer.start(startKey, endKey, writeThreads);
  reader.start(startKey, endKey, readThreads);
  writer.waitForFinish();
  reader.waitForFinish();
  // reader.waitForVerification(300000);
  // reader.abortAndWaitForFinish();
  status("Readers and writers stopped for test " + description);

  boolean success = writer.getNumWriteFailures() == 0;
  if (!success) {
    LOG.error("Write failed");
  } else {
    success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0;
    if (!success) {
      LOG.error("Read failed");
    }
  }

  // Dump perf regardless of the result.
  /*StringBuilder perfDump = new StringBuilder();
  for (Pair<Long, Long> pt : reader.getMetrics().getCombinedCdf()) {
    perfDump.append(String.format(
        "csvread,%s,%d,%d%n", description, pt.getFirst(), pt.getSecond()));
  }
  if (dumpTimePerf) {
    Iterator<Triple<Long, Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries();
    while (timePerf.hasNext()) {
      Triple<Long, Double, Long> pt = timePerf.next();
      perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n",
          description, pt.getFirst(), pt.getThird(), pt.getSecond()));
    }
  }
  LOG.info("Performance data dump for " + description + " test: \n" + perfDump.toString());*/
  status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec");
  Assert.assertTrue(success);
}
项目:ditb    文件:IntegrationTestLazyCfLoading.java   
@Test
public void testReadersAndWriters() throws Exception {
  Configuration conf = util.getConfiguration();
  String timeoutKey = String.format(TIMEOUT_KEY, this.getClass().getSimpleName());
  long maxRuntime = conf.getLong(timeoutKey, DEFAULT_TIMEOUT_MINUTES);
  long serverCount = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
  long keysToWrite = serverCount * KEYS_TO_WRITE_PER_SERVER;
  Table table = new HTable(conf, TABLE_NAME);

  // Create multi-threaded writer and start it. We write multiple columns/CFs and verify
  // their integrity, therefore multi-put is necessary.
  MultiThreadedWriter writer =
    new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
  writer.setMultiPut(true);

  LOG.info("Starting writer; the number of keys to write is " + keysToWrite);
  // TODO : Need to see if tag support has to be given here in the integration test suite
  writer.start(1, keysToWrite, WRITER_THREADS);

  // Now, do scans.
  long now = EnvironmentEdgeManager.currentTime();
  long timeLimit = now + (maxRuntime * 60000);
  boolean isWriterDone = false;
  while (now < timeLimit && !isWriterDone) {
    LOG.info("Starting the scan; wrote approximately "
      + dataGen.getTotalNumberOfKeys() + " keys");
    isWriterDone = writer.isDone();
    if (isWriterDone) {
      LOG.info("Scanning full result, writer is done");
    }
    Scan scan = new Scan();
    for (byte[] cf : dataGen.getColumnFamilies()) {
      scan.addFamily(cf);
    }
    scan.setFilter(dataGen.getScanFilter());
    scan.setLoadColumnFamiliesOnDemand(true);
    // The number of keys we can expect from scan - lower bound (before scan).
    // Not a strict lower bound - writer knows nothing about filters, so we report
    // this from generator. Writer might have generated the value but not put it yet.
    long onesGennedBeforeScan = dataGen.getExpectedNumberOfKeys();
    long startTs = EnvironmentEdgeManager.currentTime();
    ResultScanner results = table.getScanner(scan);
    long resultCount = 0;
    Result result = null;
    // Verify and count the results.
    while ((result = results.next()) != null) {
      boolean isOk = writer.verifyResultAgainstDataGenerator(result, true, true);
      Assert.assertTrue("Failed to verify [" + Bytes.toString(result.getRow())+ "]", isOk);
      ++resultCount;
    }
    long timeTaken = EnvironmentEdgeManager.currentTime() - startTs;
    // Verify the result count.
    long onesGennedAfterScan = dataGen.getExpectedNumberOfKeys();
    Assert.assertTrue("Read " + resultCount + " keys when at most " + onesGennedAfterScan
      + " were generated ", onesGennedAfterScan >= resultCount);
    if (isWriterDone) {
      Assert.assertTrue("Read " + resultCount + " keys; the writer is done and "
        + onesGennedAfterScan + " keys were generated", onesGennedAfterScan == resultCount);
    } else if (onesGennedBeforeScan * 0.9 > resultCount) {
      LOG.warn("Read way too few keys (" + resultCount + "/" + onesGennedBeforeScan
        + ") - there might be a problem, or the writer might just be slow");
    }
    LOG.info("Scan took " + timeTaken + "ms");
    if (!isWriterDone) {
      Thread.sleep(WAIT_BETWEEN_SCANS_MS);
      now = EnvironmentEdgeManager.currentTime();
    }
  }
  Assert.assertEquals("There are write failures", 0, writer.getNumWriteFailures());
  Assert.assertTrue("Writer is not done", isWriterDone);
  // Assert.fail("Boom!");
}
项目:HIndex    文件:StripeCompactionsPerformanceEvaluation.java   
private void runOneTest(String description, Configuration conf) throws Exception {
  int numServers = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
  long startKey = (long)preloadKeys * numServers;
  long endKey = startKey + (long)writeKeys * numServers;
  status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d",
      description, numServers, startKey, endKey));

  TableName tn = TableName.valueOf(TABLE_NAME);
  if (preloadKeys > 0) {
    MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, tn);
    long time = System.currentTimeMillis();
    preloader.start(0, startKey, writeThreads);
    preloader.waitForFinish();
    if (preloader.getNumWriteFailures() > 0) {
      throw new IOException("Preload failed");
    }
    int waitTime = (int)Math.min(preloadKeys / 100, 30000); // arbitrary
    status(description + " preload took " + (System.currentTimeMillis()-time)/1000
        + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize");
    Thread.sleep(waitTime);
  }

  MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tn);
  MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tn, 100);
  // reader.getMetrics().enable();
  reader.linkToWriter(writer);

  long testStartTime = System.currentTimeMillis();
  writer.start(startKey, endKey, writeThreads);
  reader.start(startKey, endKey, readThreads);
  writer.waitForFinish();
  reader.waitForFinish();
  // reader.waitForVerification(300000);
  // reader.abortAndWaitForFinish();
  status("Readers and writers stopped for test " + description);

  boolean success = writer.getNumWriteFailures() == 0;
  if (!success) {
    LOG.error("Write failed");
  } else {
    success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0;
    if (!success) {
      LOG.error("Read failed");
    }
  }

  // Dump perf regardless of the result.
  /*StringBuilder perfDump = new StringBuilder();
  for (Pair<Long, Long> pt : reader.getMetrics().getCombinedCdf()) {
    perfDump.append(String.format(
        "csvread,%s,%d,%d%n", description, pt.getFirst(), pt.getSecond()));
  }
  if (dumpTimePerf) {
    Iterator<Triple<Long, Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries();
    while (timePerf.hasNext()) {
      Triple<Long, Double, Long> pt = timePerf.next();
      perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n",
          description, pt.getFirst(), pt.getThird(), pt.getSecond()));
    }
  }
  LOG.info("Performance data dump for " + description + " test: \n" + perfDump.toString());*/
  status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec");
  Assert.assertTrue(success);
}
项目:HIndex    文件:IntegrationTestLazyCfLoading.java   
@Test
public void testReadersAndWriters() throws Exception {
  Configuration conf = util.getConfiguration();
  String timeoutKey = String.format(TIMEOUT_KEY, this.getClass().getSimpleName());
  long maxRuntime = conf.getLong(timeoutKey, DEFAULT_TIMEOUT_MINUTES);
  long serverCount = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
  long keysToWrite = serverCount * KEYS_TO_WRITE_PER_SERVER;
  HTable table = new HTable(conf, TABLE_NAME);

  // Create multi-threaded writer and start it. We write multiple columns/CFs and verify
  // their integrity, therefore multi-put is necessary.
  MultiThreadedWriter writer =
    new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
  writer.setMultiPut(true);

  LOG.info("Starting writer; the number of keys to write is " + keysToWrite);
  // TODO : Need to see if tag support has to be given here in the integration test suite
  writer.start(1, keysToWrite, WRITER_THREADS);

  // Now, do scans.
  long now = EnvironmentEdgeManager.currentTimeMillis();
  long timeLimit = now + (maxRuntime * 60000);
  boolean isWriterDone = false;
  while (now < timeLimit && !isWriterDone) {
    LOG.info("Starting the scan; wrote approximately "
      + dataGen.getTotalNumberOfKeys() + " keys");
    isWriterDone = writer.isDone();
    if (isWriterDone) {
      LOG.info("Scanning full result, writer is done");
    }
    Scan scan = new Scan();
    for (byte[] cf : dataGen.getColumnFamilies()) {
      scan.addFamily(cf);
    }
    scan.setFilter(dataGen.getScanFilter());
    scan.setLoadColumnFamiliesOnDemand(true);
    // The number of keys we can expect from scan - lower bound (before scan).
    // Not a strict lower bound - writer knows nothing about filters, so we report
    // this from generator. Writer might have generated the value but not put it yet.
    long onesGennedBeforeScan = dataGen.getExpectedNumberOfKeys();
    long startTs = EnvironmentEdgeManager.currentTimeMillis();
    ResultScanner results = table.getScanner(scan);
    long resultCount = 0;
    Result result = null;
    // Verify and count the results.
    while ((result = results.next()) != null) {
      boolean isOk = writer.verifyResultAgainstDataGenerator(result, true, true);
      Assert.assertTrue("Failed to verify [" + Bytes.toString(result.getRow())+ "]", isOk);
      ++resultCount;
    }
    long timeTaken = EnvironmentEdgeManager.currentTimeMillis() - startTs;
    // Verify the result count.
    long onesGennedAfterScan = dataGen.getExpectedNumberOfKeys();
    Assert.assertTrue("Read " + resultCount + " keys when at most " + onesGennedAfterScan
      + " were generated ", onesGennedAfterScan >= resultCount);
    if (isWriterDone) {
      Assert.assertTrue("Read " + resultCount + " keys; the writer is done and "
        + onesGennedAfterScan + " keys were generated", onesGennedAfterScan == resultCount);
    } else if (onesGennedBeforeScan * 0.9 > resultCount) {
      LOG.warn("Read way too few keys (" + resultCount + "/" + onesGennedBeforeScan
        + ") - there might be a problem, or the writer might just be slow");
    }
    LOG.info("Scan took " + timeTaken + "ms");
    if (!isWriterDone) {
      Thread.sleep(WAIT_BETWEEN_SCANS_MS);
      now = EnvironmentEdgeManager.currentTimeMillis();
    }
  }
  Assert.assertEquals("There are write failures", 0, writer.getNumWriteFailures());
  Assert.assertTrue("Writer is not done", isWriterDone);
  // Assert.fail("Boom!");
}
项目:hbase    文件:StripeCompactionsPerformanceEvaluation.java   
private void runOneTest(String description, Configuration conf) throws Exception {
  int numServers = util.getHBaseClusterInterface()
    .getClusterMetrics().getLiveServerMetrics().size();
  long startKey = preloadKeys * numServers;
  long endKey = startKey + writeKeys * numServers;
  status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d",
      description, numServers, startKey, endKey));

  if (preloadKeys > 0) {
    MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
    long time = System.currentTimeMillis();
    preloader.start(0, startKey, writeThreads);
    preloader.waitForFinish();
    if (preloader.getNumWriteFailures() > 0) {
      throw new IOException("Preload failed");
    }
    int waitTime = (int)Math.min(preloadKeys / 100, 30000); // arbitrary
    status(description + " preload took " + (System.currentTimeMillis()-time)/1000
        + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize");
    Thread.sleep(waitTime);
  }

  MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
  MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100);
  // reader.getMetrics().enable();
  reader.linkToWriter(writer);

  long testStartTime = System.currentTimeMillis();
  writer.start(startKey, endKey, writeThreads);
  reader.start(startKey, endKey, readThreads);
  writer.waitForFinish();
  reader.waitForFinish();
  // reader.waitForVerification(300000);
  // reader.abortAndWaitForFinish();
  status("Readers and writers stopped for test " + description);

  boolean success = writer.getNumWriteFailures() == 0;
  if (!success) {
    LOG.error("Write failed");
  } else {
    success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0;
    if (!success) {
      LOG.error("Read failed");
    }
  }

  // Dump perf regardless of the result.
  /*StringBuilder perfDump = new StringBuilder();
  for (Pair<Long, Long> pt : reader.getMetrics().getCombinedCdf()) {
    perfDump.append(String.format(
        "csvread,%s,%d,%d%n", description, pt.getFirst(), pt.getSecond()));
  }
  if (dumpTimePerf) {
    Iterator<Triple<Long, Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries();
    while (timePerf.hasNext()) {
      Triple<Long, Double, Long> pt = timePerf.next();
      perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n",
          description, pt.getFirst(), pt.getThird(), pt.getSecond()));
    }
  }
  LOG.info("Performance data dump for " + description + " test: \n" + perfDump.toString());*/
  status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec");
  Assert.assertTrue(success);
}
项目:hbase    文件:IntegrationTestLazyCfLoading.java   
@Test
public void testReadersAndWriters() throws Exception {
  Configuration conf = util.getConfiguration();
  String timeoutKey = String.format(TIMEOUT_KEY, this.getClass().getSimpleName());
  long maxRuntime = conf.getLong(timeoutKey, DEFAULT_TIMEOUT_MINUTES);
  long serverCount = util.getHBaseClusterInterface().getClusterMetrics()
    .getLiveServerMetrics().size();
  long keysToWrite = serverCount * KEYS_TO_WRITE_PER_SERVER;
  Connection connection = ConnectionFactory.createConnection(conf);
  Table table = connection.getTable(TABLE_NAME);

  // Create multi-threaded writer and start it. We write multiple columns/CFs and verify
  // their integrity, therefore multi-put is necessary.
  MultiThreadedWriter writer =
    new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
  writer.setMultiPut(true);

  LOG.info("Starting writer; the number of keys to write is " + keysToWrite);
  // TODO : Need to see if tag support has to be given here in the integration test suite
  writer.start(1, keysToWrite, WRITER_THREADS);

  // Now, do scans.
  long now = EnvironmentEdgeManager.currentTime();
  long timeLimit = now + (maxRuntime * 60000);
  boolean isWriterDone = false;
  while (now < timeLimit && !isWriterDone) {
    LOG.info("Starting the scan; wrote approximately "
      + dataGen.getTotalNumberOfKeys() + " keys");
    isWriterDone = writer.isDone();
    if (isWriterDone) {
      LOG.info("Scanning full result, writer is done");
    }
    Scan scan = new Scan();
    for (byte[] cf : dataGen.getColumnFamilies()) {
      scan.addFamily(cf);
    }
    scan.setFilter(dataGen.getScanFilter());
    scan.setLoadColumnFamiliesOnDemand(true);
    // The number of keys we can expect from scan - lower bound (before scan).
    // Not a strict lower bound - writer knows nothing about filters, so we report
    // this from generator. Writer might have generated the value but not put it yet.
    long onesGennedBeforeScan = dataGen.getExpectedNumberOfKeys();
    long startTs = EnvironmentEdgeManager.currentTime();
    ResultScanner results = table.getScanner(scan);
    long resultCount = 0;
    Result result = null;
    // Verify and count the results.
    while ((result = results.next()) != null) {
      boolean isOk = writer.verifyResultAgainstDataGenerator(result, true, true);
      Assert.assertTrue("Failed to verify [" + Bytes.toString(result.getRow())+ "]", isOk);
      ++resultCount;
    }
    long timeTaken = EnvironmentEdgeManager.currentTime() - startTs;
    // Verify the result count.
    long onesGennedAfterScan = dataGen.getExpectedNumberOfKeys();
    Assert.assertTrue("Read " + resultCount + " keys when at most " + onesGennedAfterScan
      + " were generated ", onesGennedAfterScan >= resultCount);
    if (isWriterDone) {
      Assert.assertTrue("Read " + resultCount + " keys; the writer is done and "
        + onesGennedAfterScan + " keys were generated", onesGennedAfterScan == resultCount);
    } else if (onesGennedBeforeScan * 0.9 > resultCount) {
      LOG.warn("Read way too few keys (" + resultCount + "/" + onesGennedBeforeScan
        + ") - there might be a problem, or the writer might just be slow");
    }
    LOG.info("Scan took " + timeTaken + "ms");
    if (!isWriterDone) {
      Thread.sleep(WAIT_BETWEEN_SCANS_MS);
      now = EnvironmentEdgeManager.currentTime();
    }
  }
  Assert.assertEquals("There are write failures", 0, writer.getNumWriteFailures());
  Assert.assertTrue("Writer is not done", isWriterDone);
  // Assert.fail("Boom!");
  connection.close();
}
项目:PyroDB    文件:StripeCompactionsPerformanceEvaluation.java   
private void runOneTest(String description, Configuration conf) throws Exception {
  int numServers = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
  long startKey = (long)preloadKeys * numServers;
  long endKey = startKey + (long)writeKeys * numServers;
  status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d",
      description, numServers, startKey, endKey));

  TableName tn = TableName.valueOf(TABLE_NAME);
  if (preloadKeys > 0) {
    MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, tn);
    long time = System.currentTimeMillis();
    preloader.start(0, startKey, writeThreads);
    preloader.waitForFinish();
    if (preloader.getNumWriteFailures() > 0) {
      throw new IOException("Preload failed");
    }
    int waitTime = (int)Math.min(preloadKeys / 100, 30000); // arbitrary
    status(description + " preload took " + (System.currentTimeMillis()-time)/1000
        + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize");
    Thread.sleep(waitTime);
  }

  MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tn);
  MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tn, 100);
  // reader.getMetrics().enable();
  reader.linkToWriter(writer);

  long testStartTime = System.currentTimeMillis();
  writer.start(startKey, endKey, writeThreads);
  reader.start(startKey, endKey, readThreads);
  writer.waitForFinish();
  reader.waitForFinish();
  // reader.waitForVerification(300000);
  // reader.abortAndWaitForFinish();
  status("Readers and writers stopped for test " + description);

  boolean success = writer.getNumWriteFailures() == 0;
  if (!success) {
    LOG.error("Write failed");
  } else {
    success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0;
    if (!success) {
      LOG.error("Read failed");
    }
  }

  // Dump perf regardless of the result.
  /*StringBuilder perfDump = new StringBuilder();
  for (Pair<Long, Long> pt : reader.getMetrics().getCombinedCdf()) {
    perfDump.append(String.format(
        "csvread,%s,%d,%d%n", description, pt.getFirst(), pt.getSecond()));
  }
  if (dumpTimePerf) {
    Iterator<Triple<Long, Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries();
    while (timePerf.hasNext()) {
      Triple<Long, Double, Long> pt = timePerf.next();
      perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n",
          description, pt.getFirst(), pt.getThird(), pt.getSecond()));
    }
  }
  LOG.info("Performance data dump for " + description + " test: \n" + perfDump.toString());*/
  status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec");
  Assert.assertTrue(success);
}
项目:PyroDB    文件:IntegrationTestLazyCfLoading.java   
@Test
public void testReadersAndWriters() throws Exception {
  Configuration conf = util.getConfiguration();
  String timeoutKey = String.format(TIMEOUT_KEY, this.getClass().getSimpleName());
  long maxRuntime = conf.getLong(timeoutKey, DEFAULT_TIMEOUT_MINUTES);
  long serverCount = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
  long keysToWrite = serverCount * KEYS_TO_WRITE_PER_SERVER;
  HTable table = new HTable(conf, TABLE_NAME);

  // Create multi-threaded writer and start it. We write multiple columns/CFs and verify
  // their integrity, therefore multi-put is necessary.
  MultiThreadedWriter writer =
    new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
  writer.setMultiPut(true);

  LOG.info("Starting writer; the number of keys to write is " + keysToWrite);
  // TODO : Need to see if tag support has to be given here in the integration test suite
  writer.start(1, keysToWrite, WRITER_THREADS);

  // Now, do scans.
  long now = EnvironmentEdgeManager.currentTimeMillis();
  long timeLimit = now + (maxRuntime * 60000);
  boolean isWriterDone = false;
  while (now < timeLimit && !isWriterDone) {
    LOG.info("Starting the scan; wrote approximately "
      + dataGen.getTotalNumberOfKeys() + " keys");
    isWriterDone = writer.isDone();
    if (isWriterDone) {
      LOG.info("Scanning full result, writer is done");
    }
    Scan scan = new Scan();
    for (byte[] cf : dataGen.getColumnFamilies()) {
      scan.addFamily(cf);
    }
    scan.setFilter(dataGen.getScanFilter());
    scan.setLoadColumnFamiliesOnDemand(true);
    // The number of keys we can expect from scan - lower bound (before scan).
    // Not a strict lower bound - writer knows nothing about filters, so we report
    // this from generator. Writer might have generated the value but not put it yet.
    long onesGennedBeforeScan = dataGen.getExpectedNumberOfKeys();
    long startTs = EnvironmentEdgeManager.currentTimeMillis();
    ResultScanner results = table.getScanner(scan);
    long resultCount = 0;
    Result result = null;
    // Verify and count the results.
    while ((result = results.next()) != null) {
      boolean isOk = writer.verifyResultAgainstDataGenerator(result, true, true);
      Assert.assertTrue("Failed to verify [" + Bytes.toString(result.getRow())+ "]", isOk);
      ++resultCount;
    }
    long timeTaken = EnvironmentEdgeManager.currentTimeMillis() - startTs;
    // Verify the result count.
    long onesGennedAfterScan = dataGen.getExpectedNumberOfKeys();
    Assert.assertTrue("Read " + resultCount + " keys when at most " + onesGennedAfterScan
      + " were generated ", onesGennedAfterScan >= resultCount);
    if (isWriterDone) {
      Assert.assertTrue("Read " + resultCount + " keys; the writer is done and "
        + onesGennedAfterScan + " keys were generated", onesGennedAfterScan == resultCount);
    } else if (onesGennedBeforeScan * 0.9 > resultCount) {
      LOG.warn("Read way too few keys (" + resultCount + "/" + onesGennedBeforeScan
        + ") - there might be a problem, or the writer might just be slow");
    }
    LOG.info("Scan took " + timeTaken + "ms");
    if (!isWriterDone) {
      Thread.sleep(WAIT_BETWEEN_SCANS_MS);
      now = EnvironmentEdgeManager.currentTimeMillis();
    }
  }
  Assert.assertEquals("There are write failures", 0, writer.getNumWriteFailures());
  Assert.assertTrue("Writer is not done", isWriterDone);
  // Assert.fail("Boom!");
}
项目:DominoHBase    文件:IntegrationTestLazyCfLoading.java   
@Test
public void testReadersAndWriters() throws Exception {
  Configuration conf = util.getConfiguration();
  String timeoutKey = String.format(TIMEOUT_KEY, this.getClass().getSimpleName());
  long maxRuntime = conf.getLong(timeoutKey, DEFAULT_TIMEOUT_MINUTES);
  long serverCount = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
  long keysToWrite = serverCount * KEYS_TO_WRITE_PER_SERVER;
  HTable table = new HTable(conf, Bytes.toBytes(TABLE_NAME));

  // Create multi-threaded writer and start it. We write multiple columns/CFs and verify
  // their integrity, therefore multi-put is necessary.
  MultiThreadedWriter writer =
    new MultiThreadedWriter(dataGen, conf, Bytes.toBytes(TABLE_NAME));
  writer.setMultiPut(true);

  LOG.info("Starting writer; the number of keys to write is " + keysToWrite);
  writer.start(1, keysToWrite, WRITER_THREADS);

  // Now, do scans.
  long now = EnvironmentEdgeManager.currentTimeMillis();
  long timeLimit = now + (maxRuntime * 60000);
  boolean isWriterDone = false;
  while (now < timeLimit && !isWriterDone) {
    LOG.info("Starting the scan; wrote approximately "
      + dataGen.getTotalNumberOfKeys() + " keys");
    isWriterDone = writer.isDone();
    if (isWriterDone) {
      LOG.info("Scanning full result, writer is done");
    }
    Scan scan = new Scan();
    for (byte[] cf : dataGen.getColumnFamilies()) {
      scan.addFamily(cf);
    }
    scan.setFilter(dataGen.getScanFilter());
    scan.setLoadColumnFamiliesOnDemand(true);
    // The number of keys we can expect from scan - lower bound (before scan).
    // Not a strict lower bound - writer knows nothing about filters, so we report
    // this from generator. Writer might have generated the value but not put it yet.
    long onesGennedBeforeScan = dataGen.getExpectedNumberOfKeys();
    long startTs = EnvironmentEdgeManager.currentTimeMillis();
    ResultScanner results = table.getScanner(scan);
    long resultCount = 0;
    Result result = null;
    // Verify and count the results.
    while ((result = results.next()) != null) {
      boolean isOk = writer.verifyResultAgainstDataGenerator(result, true, true);
      Assert.assertTrue("Failed to verify [" + Bytes.toString(result.getRow())+ "]", isOk);
      ++resultCount;
    }
    long timeTaken = EnvironmentEdgeManager.currentTimeMillis() - startTs;
    // Verify the result count.
    long onesGennedAfterScan = dataGen.getExpectedNumberOfKeys();
    Assert.assertTrue("Read " + resultCount + " keys when at most " + onesGennedAfterScan
      + " were generated ", onesGennedAfterScan >= resultCount);
    if (isWriterDone) {
      Assert.assertTrue("Read " + resultCount + " keys; the writer is done and "
        + onesGennedAfterScan + " keys were generated", onesGennedAfterScan == resultCount);
    } else if (onesGennedBeforeScan * 0.9 > resultCount) {
      LOG.warn("Read way too few keys (" + resultCount + "/" + onesGennedBeforeScan
        + ") - there might be a problem, or the writer might just be slow");
    }
    LOG.info("Scan took " + timeTaken + "ms");
    if (!isWriterDone) {
      Thread.sleep(WAIT_BETWEEN_SCANS_MS);
      now = EnvironmentEdgeManager.currentTimeMillis();
    }
  }
  Assert.assertEquals("There are write failures", 0, writer.getNumWriteFailures());
  Assert.assertTrue("Writer is not done", isWriterDone);
  // Assert.fail("Boom!");
}