Java 类org.apache.hadoop.util.StopWatch 实例源码

项目:hadoop-oss    文件:SecurityUtil.java   
/**
 * Resolves a host subject to the security requirements determined by
 * hadoop.security.token.service.use_ip. Optionally logs slow resolutions.
 * 
 * @param hostname host or ip to resolve
 * @return a resolved host
 * @throws UnknownHostException if the host doesn't exist
 */
@InterfaceAudience.Private
public static
InetAddress getByName(String hostname) throws UnknownHostException {
  if (logSlowLookups || LOG.isTraceEnabled()) {
    StopWatch lookupTimer = new StopWatch().start();
    InetAddress result = hostResolver.getByName(hostname);
    long elapsedMs = lookupTimer.stop().now(TimeUnit.MILLISECONDS);

    if (elapsedMs >= slowLookupThresholdMs) {
      LOG.warn("Slow name lookup for " + hostname + ". Took " + elapsedMs +
          " ms.");
    } else if (LOG.isTraceEnabled()) {
      LOG.trace("Name lookup for " + hostname + " took " + elapsedMs +
          " ms.");
    }
    return result;
  } else {
    return hostResolver.getByName(hostname);
  }
}
项目:hadoop    文件:TestJournalNode.java   
private void doPerfTest(int editsSize, int numEdits) throws Exception {
  byte[] data = new byte[editsSize];
  ch.newEpoch(1).get();
  ch.setEpoch(1);
  ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();

  StopWatch sw = new StopWatch().start();
  for (int i = 1; i < numEdits; i++) {
    ch.sendEdits(1L, i, 1, data).get();
  }
  long time = sw.now(TimeUnit.MILLISECONDS);

  System.err.println("Wrote " + numEdits + " batches of " + editsSize +
      " bytes in " + time + "ms");
  float avgRtt = (float)time/(float)numEdits;
  long throughput = ((long)numEdits * editsSize * 1000L)/time;
  System.err.println("Time per batch: " + avgRtt + "ms");
  System.err.println("Throughput: " + throughput + " bytes/sec");
}
项目:aliyun-oss-hadoop-fs    文件:KVJob.java   
public KVJob(String jobname, Configuration conf,
             Class<?> keyclass, Class<?> valueclass,
             String inputpath, String outputpath) throws Exception {
  job = Job.getInstance(conf, jobname);
  job.setJarByClass(KVJob.class);
  job.setMapperClass(KVJob.ValueMapper.class);
  job.setOutputKeyClass(keyclass);
  job.setMapOutputValueClass(valueclass);

  if (conf.get(TestConstants.NATIVETASK_KVTEST_CREATEFILE).equals("true")) {
    final FileSystem fs = FileSystem.get(conf);
    fs.delete(new Path(inputpath), true);
    fs.close();
    final TestInputFile testfile = new TestInputFile(Integer.valueOf(conf.get(
        TestConstants.FILESIZE_KEY, "1000")),
        keyclass.getName(), valueclass.getName(), conf);
    StopWatch sw = new StopWatch().start();
    testfile.createSequenceTestFile(inputpath);
    LOG.info("Created test file " + inputpath + " in "
        + sw.now(TimeUnit.MILLISECONDS) + "ms");
  }
  job.setInputFormatClass(SequenceFileInputFormat.class);
  FileInputFormat.addInputPath(job, new Path(inputpath));
  FileOutputFormat.setOutputPath(job, new Path(outputpath));
}
项目:aliyun-oss-hadoop-fs    文件:ErasureCodeBenchmarkThroughput.java   
private long writeFile(Path path) throws IOException {
  StopWatch sw = new StopWatch().start();
  System.out.println("Writing " + path);
  long dataSize = dataSizeMB * 1024 * 1024L;
  long remaining = dataSize;
  try (FSDataOutputStream outputStream = fs.create(path)) {
    if (!isGen) {
      fs.deleteOnExit(path);
    }
    int toWrite;
    while (remaining > 0) {
      toWrite = (int) Math.min(remaining, data.length);
      outputStream.write(data, 0, toWrite);
      remaining -= toWrite;
    }
    System.out.println("Finished writing " + path + ". Time taken: " +
        sw.now(TimeUnit.SECONDS) + " s.");
    return dataSize - remaining;
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestJournalNode.java   
private void doPerfTest(int editsSize, int numEdits) throws Exception {
  byte[] data = new byte[editsSize];
  ch.newEpoch(1).get();
  ch.setEpoch(1);
  ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();

  StopWatch sw = new StopWatch().start();
  for (int i = 1; i < numEdits; i++) {
    ch.sendEdits(1L, i, 1, data).get();
  }
  long time = sw.now(TimeUnit.MILLISECONDS);

  System.err.println("Wrote " + numEdits + " batches of " + editsSize +
      " bytes in " + time + "ms");
  float avgRtt = (float)time/(float)numEdits;
  long throughput = ((long)numEdits * editsSize * 1000L)/time;
  System.err.println("Time per batch: " + avgRtt + "ms");
  System.err.println("Throughput: " + throughput + " bytes/sec");
}
项目:big-c    文件:TestJournalNode.java   
private void doPerfTest(int editsSize, int numEdits) throws Exception {
  byte[] data = new byte[editsSize];
  ch.newEpoch(1).get();
  ch.setEpoch(1);
  ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();

  StopWatch sw = new StopWatch().start();
  for (int i = 1; i < numEdits; i++) {
    ch.sendEdits(1L, i, 1, data).get();
  }
  long time = sw.now(TimeUnit.MILLISECONDS);

  System.err.println("Wrote " + numEdits + " batches of " + editsSize +
      " bytes in " + time + "ms");
  float avgRtt = (float)time/(float)numEdits;
  long throughput = ((long)numEdits * editsSize * 1000L)/time;
  System.err.println("Time per batch: " + avgRtt + "ms");
  System.err.println("Throughput: " + throughput + " bytes/sec");
}
项目:hadoop    文件:TestMultiThreadedHflush.java   
private void doAWrite() throws IOException {
  StopWatch sw = new StopWatch().start();
  stm.write(toWrite);
  stm.hflush();
  long micros = sw.now(TimeUnit.MICROSECONDS);
  quantiles.insert(micros);
}
项目:hadoop    文件:TestMultiThreadedHflush.java   
public int run(String args[]) throws Exception {
  if (args.length != 1) {
    System.err.println(
      "usage: " + TestMultiThreadedHflush.class.getSimpleName() +
      " <path to test file> ");
    System.err.println(
        "Configurations settable by -D options:\n" +
        "  num.threads [default 10] - how many threads to run\n" +
        "  write.size [default 511] - bytes per write\n" +
        "  num.writes [default 50000] - how many writes to perform");
    System.exit(1);
  }
  TestMultiThreadedHflush test = new TestMultiThreadedHflush();
  Configuration conf = getConf();
  Path p = new Path(args[0]);

  int numThreads = conf.getInt("num.threads", 10);
  int writeSize = conf.getInt("write.size", 511);
  int numWrites = conf.getInt("num.writes", 50000);
  int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
      DFSConfigKeys.DFS_REPLICATION_DEFAULT);

  StopWatch sw = new StopWatch().start();
  test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites,
      replication);
  sw.stop();

  System.out.println("Finished in " + sw.now(TimeUnit.MILLISECONDS) + "ms");
  System.out.println("Latency quantiles (in microseconds):\n" +
      test.quantiles);
  return 0;
}
项目:aliyun-oss-hadoop-fs    文件:ErasureCodeBenchmarkThroughput.java   
private void benchmark(OpType type, int dataSizeMB,
    int numClients, boolean isEc, boolean statefulRead) throws Exception {
  List<Long> sizes = null;
  StopWatch sw = new StopWatch().start();
  switch (type) {
    case READ:
      sizes = doBenchmark(true, dataSizeMB, numClients, isEc,
          statefulRead, false);
      break;
    case WRITE:
      sizes = doBenchmark(
          false, dataSizeMB, numClients, isEc, statefulRead, false);
      break;
    case GEN:
      sizes = doBenchmark(false, dataSizeMB, numClients, isEc,
          statefulRead, true);
  }
  long elapsedSec = sw.now(TimeUnit.SECONDS);
  double totalDataSizeMB = 0;
  for (Long size : sizes) {
    if (size >= 0) {
      totalDataSizeMB += size.doubleValue() / 1024 / 1024;
    }
  }
  double throughput = totalDataSizeMB / elapsedSec;
  DecimalFormat df = getDecimalFormat();
  System.out.println(type + " " + df.format(totalDataSizeMB) +
      " MB data takes: " + elapsedSec + " s.\nTotal throughput: " +
      df.format(throughput) + " MB/s.");
}
项目:aliyun-oss-hadoop-fs    文件:ErasureCodeBenchmarkThroughput.java   
private long readFile(Path path) throws IOException {
  try (FSDataInputStream inputStream = fs.open(path)) {
    StopWatch sw = new StopWatch().start();
    System.out.println((statefulRead ? "Stateful reading " :
        "Positional reading ") + path);
    long totalRead = statefulRead ? doStateful(inputStream) :
        doPositional(inputStream);
    System.out.println(
        (statefulRead ? "Finished stateful read " :
            "Finished positional read ") + path + ". Time taken: " +
            sw.now(TimeUnit.SECONDS) + " s.");
    return totalRead;
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestMultiThreadedHflush.java   
private void doAWrite() throws IOException {
  StopWatch sw = new StopWatch().start();
  stm.write(toWrite);
  stm.hflush();
  long micros = sw.now(TimeUnit.MICROSECONDS);
  quantiles.insert(micros);
}
项目:aliyun-oss-hadoop-fs    文件:TestMultiThreadedHflush.java   
public int run(String args[]) throws Exception {
  if (args.length != 1) {
    System.err.println(
      "usage: " + TestMultiThreadedHflush.class.getSimpleName() +
      " <path to test file> ");
    System.err.println(
        "Configurations settable by -D options:\n" +
        "  num.threads [default 10] - how many threads to run\n" +
        "  write.size [default 511] - bytes per write\n" +
        "  num.writes [default 50000] - how many writes to perform");
    System.exit(1);
  }
  TestMultiThreadedHflush test = new TestMultiThreadedHflush();
  Configuration conf = getConf();
  Path p = new Path(args[0]);

  int numThreads = conf.getInt("num.threads", 10);
  int writeSize = conf.getInt("write.size", 511);
  int numWrites = conf.getInt("num.writes", 50000);
  int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
      DFSConfigKeys.DFS_REPLICATION_DEFAULT);

  StopWatch sw = new StopWatch().start();
  test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites,
      replication);
  sw.stop();

  System.out.println("Finished in " + sw.now(TimeUnit.MILLISECONDS) + "ms");
  System.out.println("Latency quantiles (in microseconds):\n" +
      test.quantiles);
  return 0;
}
项目:aliyun-oss-hadoop-fs    文件:TestBlockingThreadPoolExecutorService.java   
/**
 * More involved test, including detecting blocking when at capacity.
 */
@Test
public void testSubmitRunnable() throws Exception {
  ensureCreated();
  int totalTasks = NUM_ACTIVE_TASKS + NUM_WAITING_TASKS;
  StopWatch stopWatch = new StopWatch().start();
  for (int i = 0; i < totalTasks; i++) {
    tpe.submit(sleeper);
    assertDidntBlock(stopWatch);
  }
  tpe.submit(sleeper);
  assertDidBlock(stopWatch);
}
项目:aliyun-oss-hadoop-fs    文件:TestBlockingThreadPoolExecutorService.java   
private void assertDidntBlock(StopWatch sw) {
  try {
    assertFalse("Non-blocking call took too long.",
        sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC);
  } finally {
    sw.reset().start();
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestBlockingThreadPoolExecutorService.java   
private void assertDidBlock(StopWatch sw) {
  try {
    if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) {
      throw new RuntimeException("Blocking call returned too fast.");
    }
  } finally {
    sw.reset().start();
  }
}
项目:big-c    文件:TestMultiThreadedHflush.java   
private void doAWrite() throws IOException {
  StopWatch sw = new StopWatch().start();
  stm.write(toWrite);
  stm.hflush();
  long micros = sw.now(TimeUnit.MICROSECONDS);
  quantiles.insert(micros);
}
项目:big-c    文件:TestMultiThreadedHflush.java   
public int run(String args[]) throws Exception {
  if (args.length != 1) {
    System.err.println(
      "usage: " + TestMultiThreadedHflush.class.getSimpleName() +
      " <path to test file> ");
    System.err.println(
        "Configurations settable by -D options:\n" +
        "  num.threads [default 10] - how many threads to run\n" +
        "  write.size [default 511] - bytes per write\n" +
        "  num.writes [default 50000] - how many writes to perform");
    System.exit(1);
  }
  TestMultiThreadedHflush test = new TestMultiThreadedHflush();
  Configuration conf = getConf();
  Path p = new Path(args[0]);

  int numThreads = conf.getInt("num.threads", 10);
  int writeSize = conf.getInt("write.size", 511);
  int numWrites = conf.getInt("num.writes", 50000);
  int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
      DFSConfigKeys.DFS_REPLICATION_DEFAULT);

  StopWatch sw = new StopWatch().start();
  test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites,
      replication);
  sw.stop();

  System.out.println("Finished in " + sw.now(TimeUnit.MILLISECONDS) + "ms");
  System.out.println("Latency quantiles (in microseconds):\n" +
      test.quantiles);
  return 0;
}
项目:hops    文件:RawErasureCoderBenchmark.java   
@Override
public Long call() throws Exception {
  long rounds = BenchData.totalDataSizeKB / BenchData.bufferSizeKB;

  StopWatch sw = new StopWatch().start();
  for (long i = 0; i < rounds; i++) {
    while (testData.remaining() > 0) {
      for (ByteBuffer output : benchData.outputs) {
        output.clear();
      }

      for (int j = 0; j < benchData.inputs.length; j++) {
        benchData.inputs[j] = testData.duplicate();
        benchData.inputs[j].limit(
            testData.position() + BenchData.chunkSize);
        benchData.inputs[j] = benchData.inputs[j].slice();
        testData.position(testData.position() + BenchData.chunkSize);
      }

      if (isEncode) {
        benchData.encode(encoder);
      } else {
        benchData.prepareDecInput();
        benchData.decode(decoder);
      }
    }
    testData.clear();
  }
  return sw.now(TimeUnit.MILLISECONDS);
}
项目:hadoop    文件:FileInputFormat.java   
/** List input directories.
 * Subclasses may override to, e.g., select only files matching a regular
 * expression. 
 * 
 * @param job the job to list input paths for
 * @return array of FileStatus objects
 * @throws IOException if zero items.
 */
protected List<FileStatus> listStatus(JobContext job
                                      ) throws IOException {
  Path[] dirs = getInputPaths(job);
  if (dirs.length == 0) {
    throw new IOException("No input paths specified in job");
  }

  // get tokens for all the required FileSystems..
  TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 
                                      job.getConfiguration());

  // Whether we need to recursive look into the directory structure
  boolean recursive = getInputDirRecursive(job);

  // creates a MultiPathFilter with the hiddenFileFilter and the
  // user provided one (if any).
  List<PathFilter> filters = new ArrayList<PathFilter>();
  filters.add(hiddenFileFilter);
  PathFilter jobFilter = getInputPathFilter(job);
  if (jobFilter != null) {
    filters.add(jobFilter);
  }
  PathFilter inputFilter = new MultiPathFilter(filters);

  List<FileStatus> result = null;

  int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
      DEFAULT_LIST_STATUS_NUM_THREADS);
  StopWatch sw = new StopWatch().start();
  if (numThreads == 1) {
    result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
  } else {
    Iterable<FileStatus> locatedFiles = null;
    try {
      LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
          job.getConfiguration(), dirs, recursive, inputFilter, true);
      locatedFiles = locatedFileStatusFetcher.getFileStatuses();
    } catch (InterruptedException e) {
      throw new IOException("Interrupted while getting file statuses");
    }
    result = Lists.newArrayList(locatedFiles);
  }

  sw.stop();
  if (LOG.isDebugEnabled()) {
    LOG.debug("Time taken to get FileStatuses: "
        + sw.now(TimeUnit.MILLISECONDS));
  }
  LOG.info("Total input paths to process : " + result.size()); 
  return result;
}
项目:hadoop    文件:Journal.java   
/**
 * Write a batch of edits to the journal.
 * {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])}
 */
synchronized void journal(RequestInfo reqInfo,
    long segmentTxId, long firstTxnId,
    int numTxns, byte[] records) throws IOException {
  checkFormatted();
  checkWriteRequest(reqInfo);

  checkSync(curSegment != null,
      "Can't write, no segment open");

  if (curSegmentTxId != segmentTxId) {
    // Sanity check: it is possible that the writer will fail IPCs
    // on both the finalize() and then the start() of the next segment.
    // This could cause us to continue writing to an old segment
    // instead of rolling to a new one, which breaks one of the
    // invariants in the design. If it happens, abort the segment
    // and throw an exception.
    JournalOutOfSyncException e = new JournalOutOfSyncException(
        "Writer out of sync: it thinks it is writing segment " + segmentTxId
        + " but current segment is " + curSegmentTxId);
    abortCurSegment();
    throw e;
  }

  checkSync(nextTxId == firstTxnId,
      "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);

  long lastTxnId = firstTxnId + numTxns - 1;
  if (LOG.isTraceEnabled()) {
    LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId);
  }

  // If the edit has already been marked as committed, we know
  // it has been fsynced on a quorum of other nodes, and we are
  // "catching up" with the rest. Hence we do not need to fsync.
  boolean isLagging = lastTxnId <= committedTxnId.get();
  boolean shouldFsync = !isLagging;

  curSegment.writeRaw(records, 0, records.length);
  curSegment.setReadyToFlush();
  StopWatch sw = new StopWatch();
  sw.start();
  curSegment.flush(shouldFsync);
  sw.stop();

  long nanoSeconds = sw.now();
  metrics.addSync(
      TimeUnit.MICROSECONDS.convert(nanoSeconds, TimeUnit.NANOSECONDS));
  long milliSeconds = TimeUnit.MILLISECONDS.convert(
      nanoSeconds, TimeUnit.NANOSECONDS);

  if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) {
    LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId +
             " took " + milliSeconds + "ms");
  }

  if (isLagging) {
    // This batch of edits has already been committed on a quorum of other
    // nodes. So, we are in "catch up" mode. This gets its own metric.
    metrics.batchesWrittenWhileLagging.incr(1);
  }

  metrics.batchesWritten.incr(1);
  metrics.bytesWritten.incr(records.length);
  metrics.txnsWritten.incr(numTxns);

  highestWrittenTxId = lastTxnId;
  nextTxId = lastTxnId + 1;
}
项目:aliyun-oss-hadoop-fs    文件:FileInputFormat.java   
/** List input directories.
 * Subclasses may override to, e.g., select only files matching a regular
 * expression. 
 * 
 * @param job the job to list input paths for
 * @return array of FileStatus objects
 * @throws IOException if zero items.
 */
protected List<FileStatus> listStatus(JobContext job
                                      ) throws IOException {
  Path[] dirs = getInputPaths(job);
  if (dirs.length == 0) {
    throw new IOException("No input paths specified in job");
  }

  // get tokens for all the required FileSystems..
  TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 
                                      job.getConfiguration());

  // Whether we need to recursive look into the directory structure
  boolean recursive = getInputDirRecursive(job);

  // creates a MultiPathFilter with the hiddenFileFilter and the
  // user provided one (if any).
  List<PathFilter> filters = new ArrayList<PathFilter>();
  filters.add(hiddenFileFilter);
  PathFilter jobFilter = getInputPathFilter(job);
  if (jobFilter != null) {
    filters.add(jobFilter);
  }
  PathFilter inputFilter = new MultiPathFilter(filters);

  List<FileStatus> result = null;

  int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
      DEFAULT_LIST_STATUS_NUM_THREADS);
  StopWatch sw = new StopWatch().start();
  if (numThreads == 1) {
    result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
  } else {
    Iterable<FileStatus> locatedFiles = null;
    try {
      LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
          job.getConfiguration(), dirs, recursive, inputFilter, true);
      locatedFiles = locatedFileStatusFetcher.getFileStatuses();
    } catch (InterruptedException e) {
      throw new IOException("Interrupted while getting file statuses");
    }
    result = Lists.newArrayList(locatedFiles);
  }

  sw.stop();
  if (LOG.isDebugEnabled()) {
    LOG.debug("Time taken to get FileStatuses: "
        + sw.now(TimeUnit.MILLISECONDS));
  }
  LOG.info("Total input files to process : " + result.size());
  return result;
}
项目:aliyun-oss-hadoop-fs    文件:Journal.java   
/**
 * Write a batch of edits to the journal.
 * {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])}
 */
synchronized void journal(RequestInfo reqInfo,
    long segmentTxId, long firstTxnId,
    int numTxns, byte[] records) throws IOException {
  checkFormatted();
  checkWriteRequest(reqInfo);

  checkSync(curSegment != null,
      "Can't write, no segment open");

  if (curSegmentTxId != segmentTxId) {
    // Sanity check: it is possible that the writer will fail IPCs
    // on both the finalize() and then the start() of the next segment.
    // This could cause us to continue writing to an old segment
    // instead of rolling to a new one, which breaks one of the
    // invariants in the design. If it happens, abort the segment
    // and throw an exception.
    JournalOutOfSyncException e = new JournalOutOfSyncException(
        "Writer out of sync: it thinks it is writing segment " + segmentTxId
        + " but current segment is " + curSegmentTxId);
    abortCurSegment();
    throw e;
  }

  checkSync(nextTxId == firstTxnId,
      "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);

  long lastTxnId = firstTxnId + numTxns - 1;
  if (LOG.isTraceEnabled()) {
    LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId);
  }

  // If the edit has already been marked as committed, we know
  // it has been fsynced on a quorum of other nodes, and we are
  // "catching up" with the rest. Hence we do not need to fsync.
  boolean isLagging = lastTxnId <= committedTxnId.get();
  boolean shouldFsync = !isLagging;

  curSegment.writeRaw(records, 0, records.length);
  curSegment.setReadyToFlush();
  StopWatch sw = new StopWatch();
  sw.start();
  curSegment.flush(shouldFsync);
  sw.stop();

  long nanoSeconds = sw.now();
  metrics.addSync(
      TimeUnit.MICROSECONDS.convert(nanoSeconds, TimeUnit.NANOSECONDS));
  long milliSeconds = TimeUnit.MILLISECONDS.convert(
      nanoSeconds, TimeUnit.NANOSECONDS);

  if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) {
    LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId +
             " took " + milliSeconds + "ms");
  }

  if (isLagging) {
    // This batch of edits has already been committed on a quorum of other
    // nodes. So, we are in "catch up" mode. This gets its own metric.
    metrics.batchesWrittenWhileLagging.incr(1);
  }

  metrics.batchesWritten.incr(1);
  metrics.bytesWritten.incr(records.length);
  metrics.txnsWritten.incr(numTxns);

  updateHighestWrittenTxId(lastTxnId);
  nextTxId = lastTxnId + 1;
  lastJournalTimestamp = Time.now();
}
项目:big-c    文件:FileInputFormat.java   
/** List input directories.
 * Subclasses may override to, e.g., select only files matching a regular
 * expression. 
 * 
 * @param job the job to list input paths for
 * @return array of FileStatus objects
 * @throws IOException if zero items.
 */
protected List<FileStatus> listStatus(JobContext job
                                      ) throws IOException {
  Path[] dirs = getInputPaths(job);
  if (dirs.length == 0) {
    throw new IOException("No input paths specified in job");
  }

  // get tokens for all the required FileSystems..
  TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 
                                      job.getConfiguration());

  // Whether we need to recursive look into the directory structure
  boolean recursive = getInputDirRecursive(job);

  // creates a MultiPathFilter with the hiddenFileFilter and the
  // user provided one (if any).
  List<PathFilter> filters = new ArrayList<PathFilter>();
  filters.add(hiddenFileFilter);
  PathFilter jobFilter = getInputPathFilter(job);
  if (jobFilter != null) {
    filters.add(jobFilter);
  }
  PathFilter inputFilter = new MultiPathFilter(filters);

  List<FileStatus> result = null;

  int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
      DEFAULT_LIST_STATUS_NUM_THREADS);
  StopWatch sw = new StopWatch().start();
  if (numThreads == 1) {
    result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
  } else {
    Iterable<FileStatus> locatedFiles = null;
    try {
      LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
          job.getConfiguration(), dirs, recursive, inputFilter, true);
      locatedFiles = locatedFileStatusFetcher.getFileStatuses();
    } catch (InterruptedException e) {
      throw new IOException("Interrupted while getting file statuses");
    }
    result = Lists.newArrayList(locatedFiles);
  }

  sw.stop();
  if (LOG.isDebugEnabled()) {
    LOG.debug("Time taken to get FileStatuses: "
        + sw.now(TimeUnit.MILLISECONDS));
  }
  LOG.info("Total input paths to process : " + result.size()); 
  return result;
}
项目:big-c    文件:Journal.java   
/**
 * Write a batch of edits to the journal.
 * {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])}
 */
synchronized void journal(RequestInfo reqInfo,
    long segmentTxId, long firstTxnId,
    int numTxns, byte[] records) throws IOException {
  checkFormatted();
  checkWriteRequest(reqInfo);

  checkSync(curSegment != null,
      "Can't write, no segment open");

  if (curSegmentTxId != segmentTxId) {
    // Sanity check: it is possible that the writer will fail IPCs
    // on both the finalize() and then the start() of the next segment.
    // This could cause us to continue writing to an old segment
    // instead of rolling to a new one, which breaks one of the
    // invariants in the design. If it happens, abort the segment
    // and throw an exception.
    JournalOutOfSyncException e = new JournalOutOfSyncException(
        "Writer out of sync: it thinks it is writing segment " + segmentTxId
        + " but current segment is " + curSegmentTxId);
    abortCurSegment();
    throw e;
  }

  checkSync(nextTxId == firstTxnId,
      "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);

  long lastTxnId = firstTxnId + numTxns - 1;
  if (LOG.isTraceEnabled()) {
    LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId);
  }

  // If the edit has already been marked as committed, we know
  // it has been fsynced on a quorum of other nodes, and we are
  // "catching up" with the rest. Hence we do not need to fsync.
  boolean isLagging = lastTxnId <= committedTxnId.get();
  boolean shouldFsync = !isLagging;

  curSegment.writeRaw(records, 0, records.length);
  curSegment.setReadyToFlush();
  StopWatch sw = new StopWatch();
  sw.start();
  curSegment.flush(shouldFsync);
  sw.stop();

  long nanoSeconds = sw.now();
  metrics.addSync(
      TimeUnit.MICROSECONDS.convert(nanoSeconds, TimeUnit.NANOSECONDS));
  long milliSeconds = TimeUnit.MILLISECONDS.convert(
      nanoSeconds, TimeUnit.NANOSECONDS);

  if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) {
    LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId +
             " took " + milliSeconds + "ms");
  }

  if (isLagging) {
    // This batch of edits has already been committed on a quorum of other
    // nodes. So, we are in "catch up" mode. This gets its own metric.
    metrics.batchesWrittenWhileLagging.incr(1);
  }

  metrics.batchesWritten.incr(1);
  metrics.bytesWritten.incr(records.length);
  metrics.txnsWritten.incr(numTxns);

  highestWrittenTxId = lastTxnId;
  nextTxId = lastTxnId + 1;
}
项目:hops    文件:FileInputFormat.java   
/** List input directories.
 * Subclasses may override to, e.g., select only files matching a regular
 * expression. 
 * 
 * @param job the job to list input paths for
 * @return array of FileStatus objects
 * @throws IOException if zero items.
 */
protected List<FileStatus> listStatus(JobContext job
                                      ) throws IOException {
  Path[] dirs = getInputPaths(job);
  if (dirs.length == 0) {
    throw new IOException("No input paths specified in job");
  }

  // get tokens for all the required FileSystems..
  TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 
                                      job.getConfiguration());

  // Whether we need to recursive look into the directory structure
  boolean recursive = getInputDirRecursive(job);

  // creates a MultiPathFilter with the hiddenFileFilter and the
  // user provided one (if any).
  List<PathFilter> filters = new ArrayList<PathFilter>();
  filters.add(hiddenFileFilter);
  PathFilter jobFilter = getInputPathFilter(job);
  if (jobFilter != null) {
    filters.add(jobFilter);
  }
  PathFilter inputFilter = new MultiPathFilter(filters);

  List<FileStatus> result = null;

  int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
      DEFAULT_LIST_STATUS_NUM_THREADS);
  StopWatch sw = new StopWatch().start();
  if (numThreads == 1) {
    result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
  } else {
    Iterable<FileStatus> locatedFiles = null;
    try {
      LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
          job.getConfiguration(), dirs, recursive, inputFilter, true);
      locatedFiles = locatedFileStatusFetcher.getFileStatuses();
    } catch (InterruptedException e) {
      throw new IOException("Interrupted while getting file statuses");
    }
    result = Lists.newArrayList(locatedFiles);
  }

  sw.stop();
  if (LOG.isDebugEnabled()) {
    LOG.debug("Time taken to get FileStatuses: "
        + sw.now(TimeUnit.MILLISECONDS));
  }
  LOG.info("Total input files to process : " + result.size());
  return result;
}
项目:hops    文件:RawErasureCoderBenchmark.java   
/**
 * Performs benchmark.
 *
 * @param opType      The operation to perform. Can be encode or decode
 * @param coder       The coder to use
 * @param numThreads  Number of threads to launch concurrently
 * @param dataSizeMB  Total test data size in MB
 * @param chunkSizeKB Chunk size in KB
 */
public static void performBench(String opType, CODER coder,
    int numThreads, int dataSizeMB, int chunkSizeKB) throws Exception {
  BenchData.configure(dataSizeMB, chunkSizeKB);

  RawErasureEncoder encoder = null;
  RawErasureDecoder decoder = null;
  ByteBuffer testData;
  boolean isEncode = opType.equals("encode");

  if (isEncode) {
    encoder = getRawEncoder(coder.ordinal());
    testData = genTestData(encoder.preferDirectBuffer(),
        BenchData.bufferSizeKB);
  } else {
    decoder = getRawDecoder(coder.ordinal());
    testData = genTestData(decoder.preferDirectBuffer(),
        BenchData.bufferSizeKB);
  }

  ExecutorService executor = Executors.newFixedThreadPool(numThreads);
  List<Future<Long>> futures = new ArrayList<>(numThreads);
  StopWatch sw = new StopWatch().start();
  for (int i = 0; i < numThreads; i++) {
    futures.add(executor.submit(new BenchmarkCallable(isEncode,
        encoder, decoder, testData.duplicate())));
  }
  List<Long> durations = new ArrayList<>(numThreads);
  try {
    for (Future<Long> future : futures) {
      durations.add(future.get());
    }
    long duration = sw.now(TimeUnit.MILLISECONDS);
    double totalDataSize = BenchData.totalDataSizeKB * numThreads / 1024.0;
    DecimalFormat df = new DecimalFormat("#.##");
    System.out.println(coder + " " + opType + " " +
        df.format(totalDataSize) + "MB data, with chunk size " +
        BenchData.chunkSize / 1024 + "KB");
    System.out.println("Total time: " + df.format(duration / 1000.0) + " s.");
    System.out.println("Total throughput: " + df.format(
        totalDataSize / duration * 1000.0) + " MB/s");
    printThreadStatistics(durations, df);
  } catch (Exception e) {
    System.out.println("Error waiting for thread to finish.");
    e.printStackTrace();
    throw e;
  } finally {
    executor.shutdown();
  }
}