Java 类org.apache.hadoop.mapred.Reporter 实例源码

项目:ditb    文件:TestTableInputFormat.java   
@Override
public void map(ImmutableBytesWritable key, Result value,
    OutputCollector<NullWritable,NullWritable> output,
    Reporter reporter) throws IOException {
  for (Cell cell : value.listCells()) {
    reporter.getCounter(TestTableInputFormat.class.getName() + ":row",
        Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
        .increment(1l);
    reporter.getCounter(TestTableInputFormat.class.getName() + ":family",
        Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
        .increment(1l);
    reporter.getCounter(TestTableInputFormat.class.getName() + ":value",
        Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
        .increment(1l);
  }
}
项目:hadoop    文件:TestDFSIO.java   
@Override // IOMapperBase
public Long doIO(Reporter reporter, 
                   String name, 
                   long totalSize // in bytes
                 ) throws IOException {
  OutputStream out = (OutputStream)this.stream;
  // write to the file
  long nrRemaining;
  for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
    int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
    out.write(buffer, 0, curSize);
    reporter.setStatus("writing " + name + "@" + 
                       (totalSize - nrRemaining) + "/" + totalSize 
                       + " ::host = " + hostName);
  }
  return Long.valueOf(totalSize);
}
项目:hadoop    文件:TestDFSIO.java   
@Override // IOMapperBase
public Long doIO(Reporter reporter, 
                   String name, 
                   long totalSize // in bytes
                 ) throws IOException {
  OutputStream out = (OutputStream)this.stream;
  // write to the file
  long nrRemaining;
  for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
    int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
    out.write(buffer, 0, curSize);
    reporter.setStatus("writing " + name + "@" + 
                       (totalSize - nrRemaining) + "/" + totalSize 
                       + " ::host = " + hostName);
  }
  return Long.valueOf(totalSize);
}
项目:hadoop    文件:TestDFSIO.java   
@Override // IOMapperBase
public Long doIO(Reporter reporter, 
                   String name, 
                   long newLength // in bytes
                 ) throws IOException {
  boolean isClosed = fs.truncate(filePath, newLength);
  reporter.setStatus("truncating " + name + " to newLength " + 
      newLength  + " ::host = " + hostName);
  for(int i = 0; !isClosed; i++) {
    try {
      Thread.sleep(DELAY);
    } catch (InterruptedException ignored) {}
    FileStatus status = fs.getFileStatus(filePath);
    assert status != null : "status is null";
    isClosed = (status.getLen() == newLength);
    reporter.setStatus("truncate recover for " + name + " to newLength " + 
        newLength + " attempt " + i + " ::host = " + hostName);
  }
  return Long.valueOf(fileSize - newLength);
}
项目:hadoop    文件:DataJoinReducerBase.java   
public void reduce(Object key, Iterator values,
                   OutputCollector output, Reporter reporter) throws IOException {
  if (this.reporter == null) {
    this.reporter = reporter;
  }

  SortedMap<Object, ResetableIterator> groups = regroup(key, values, reporter);
  Object[] tags = groups.keySet().toArray();
  ResetableIterator[] groupValues = new ResetableIterator[tags.length];
  for (int i = 0; i < tags.length; i++) {
    groupValues[i] = groups.get(tags[i]);
  }
  joinAndCollect(tags, groupValues, key, output, reporter);
  addLongValue("groupCount", 1);
  for (int i = 0; i < tags.length; i++) {
    groupValues[i].close();
  }
}
项目:ditb    文件:TestTableMapReduceUtil.java   
@Override
public void map(ImmutableBytesWritable row, Result result,
    OutputCollector<ImmutableBytesWritable, Put> outCollector,
    Reporter reporter) throws IOException {
  String rowKey = Bytes.toString(result.getRow());
  final ImmutableBytesWritable pKey = new ImmutableBytesWritable(
      Bytes.toBytes(PRESIDENT_PATTERN));
  final ImmutableBytesWritable aKey = new ImmutableBytesWritable(
      Bytes.toBytes(ACTOR_PATTERN));
  ImmutableBytesWritable outKey = null;

  if (rowKey.startsWith(PRESIDENT_PATTERN)) {
    outKey = pKey;
  } else if (rowKey.startsWith(ACTOR_PATTERN)) {
    outKey = aKey;
  } else {
    throw new AssertionError("unexpected rowKey");
  }

  String name = Bytes.toString(result.getValue(COLUMN_FAMILY,
      COLUMN_QUALIFIER));
  outCollector.collect(outKey, new Put(Bytes.toBytes("rowKey2")).add(
      COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(name)));
}
项目:hadoop    文件:TestDBInputFormat.java   
/**
 * test DBInputFormat class. Class should split result for chunks
 * @throws Exception
 */
@Test(timeout = 10000)
public void testDBInputFormat() throws Exception {
  JobConf configuration = new JobConf();
  setupDriver(configuration);

  DBInputFormat<NullDBWritable> format = new DBInputFormat<NullDBWritable>();
  format.setConf(configuration);
  format.setConf(configuration);
  DBInputFormat.DBInputSplit splitter = new DBInputFormat.DBInputSplit(1, 10);
  Reporter reporter = mock(Reporter.class);
  RecordReader<LongWritable, NullDBWritable> reader = format.getRecordReader(
      splitter, configuration, reporter);

  configuration.setInt(MRJobConfig.NUM_MAPS, 3);
  InputSplit[] lSplits = format.getSplits(configuration, 3);
  assertEquals(5, lSplits[0].getLength());
  assertEquals(3, lSplits.length);

  // test reader .Some simple tests
  assertEquals(LongWritable.class, reader.createKey().getClass());
  assertEquals(0, reader.getPos());
  assertEquals(0, reader.getProgress(), 0.001);
  reader.close();
}
项目:ditb    文件:TestIdentityTableMap.java   
@Test
@SuppressWarnings({ "deprecation", "unchecked" })
public void shouldCollectPredefinedTimes() throws IOException {
  int recordNumber = 999;
  Result resultMock = mock(Result.class);
  IdentityTableMap identityTableMap = null;
  try {
    Reporter reporterMock = mock(Reporter.class);
    identityTableMap = new IdentityTableMap();
    ImmutableBytesWritable bytesWritableMock = mock(ImmutableBytesWritable.class);
    OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
        mock(OutputCollector.class);

    for (int i = 0; i < recordNumber; i++)
      identityTableMap.map(bytesWritableMock, resultMock, outputCollectorMock,
          reporterMock);

    verify(outputCollectorMock, times(recordNumber)).collect(
        Mockito.any(ImmutableBytesWritable.class), Mockito.any(Result.class));
  } finally {
    if (identityTableMap != null)
      identityTableMap.close();
  }
}
项目:hadoop    文件:PipesReducer.java   
/**
 * Process all of the keys and values. Start up the application if we haven't
 * started it yet.
 */
public void reduce(K2 key, Iterator<V2> values, 
                   OutputCollector<K3, V3> output, Reporter reporter
                   ) throws IOException {
  isOk = false;
  startApplication(output, reporter);
  downlink.reduceKey(key);
  while (values.hasNext()) {
    downlink.reduceValue(values.next());
  }
  if(skipping) {
    //flush the streams on every record input if running in skip mode
    //so that we don't buffer other records surrounding a bad record.
    downlink.flush();
  }
  isOk = true;
}
项目:hadoop    文件:PipesReducer.java   
@SuppressWarnings("unchecked")
private void startApplication(OutputCollector<K3, V3> output, Reporter reporter) throws IOException {
  if (application == null) {
    try {
      LOG.info("starting application");
      application = 
        new Application<K2, V2, K3, V3>(
            job, null, output, reporter, 
            (Class<? extends K3>) job.getOutputKeyClass(), 
            (Class<? extends V3>) job.getOutputValueClass());
      downlink = application.getDownlink();
    } catch (InterruptedException ie) {
      throw new RuntimeException("interrupted", ie);
    }
    int reduce=0;
    downlink.runReduce(reduce, Submitter.getIsJavaRecordWriter(job));
  }
}
项目:hadoop    文件:HadoopArchives.java   
public void reduce(IntWritable key, Iterator<Text> values,
    OutputCollector<Text, Text> out,
    Reporter reporter) throws IOException {
  keyVal = key.get();
  while(values.hasNext()) {
    Text value = values.next();
    String towrite = value.toString() + "\n";
    indexStream.write(towrite.getBytes(Charsets.UTF_8));
    written++;
    if (written > numIndexes -1) {
      // every 1000 indexes we report status
      reporter.setStatus("Creating index for archives");
      reporter.progress();
      endIndex = keyVal;
      String masterWrite = startIndex + " " + endIndex + " " + startPos 
                          +  " " + indexStream.getPos() + " \n" ;
      outStream.write(masterWrite.getBytes(Charsets.UTF_8));
      startPos = indexStream.getPos();
      startIndex = endIndex;
      written = 0;
    }
  }
}
项目:hadoop    文件:ValueAggregatorCombiner.java   
/** Combines values for a given key.  
 * @param key the key is expected to be a Text object, whose prefix indicates
 * the type of aggregation to aggregate the values. 
 * @param values the values to combine
 * @param output to collect combined values
 */
public void reduce(Text key, Iterator<Text> values,
                   OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
  String keyStr = key.toString();
  int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
  String type = keyStr.substring(0, pos);
  ValueAggregator aggregator = ValueAggregatorBaseDescriptor
    .generateValueAggregator(type);
  while (values.hasNext()) {
    aggregator.addNextValue(values.next());
  }
  Iterator outputs = aggregator.getCombinerOutput().iterator();

  while (outputs.hasNext()) {
    Object v = outputs.next();
    if (v instanceof Text) {
      output.collect(key, (Text)v);
    } else {
      output.collect(key, new Text(v.toString()));
    }
  }
}
项目:hadoop    文件:InputSampler.java   
/**
 * For each split sampled, emit when the ratio of the number of records
 * retained to the total record count is less than the specified
 * frequency.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>();
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  long records = 0;
  long kept = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      ++records;
      if ((double) kept / records < freq) {
        ++kept;
        samples.add(key);
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
项目:ditb    文件:TestTableMapReduce.java   
/**
 * Pass the key, and reversed value to reduce
 */
public void map(ImmutableBytesWritable key, Result value,
  OutputCollector<ImmutableBytesWritable, Put> output,
  Reporter reporter)
throws IOException {
  output.collect(key, TestTableMapReduceBase.map(key, value));
}
项目:hadoop    文件:HadoopArchives.java   
public void copyData(Path input, FSDataInputStream fsin, 
    FSDataOutputStream fout, Reporter reporter) throws IOException {
  try {
    for (int cbread=0; (cbread = fsin.read(buffer))>= 0;) {
      fout.write(buffer, 0,cbread);
      reporter.progress();
    }
  } finally {
    fsin.close();
  }
}
项目:hadoop    文件:TestFetcher.java   
@SuppressWarnings("unchecked")
@Test(timeout=10000) 
public void testCopyFromHostCompressFailure() throws Exception {
  InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);

  Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
      r, metrics, except, key, connection);

  String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);

  when(connection.getResponseCode()).thenReturn(200);
  when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
      .thenReturn(replyHash);
  ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
  ByteArrayOutputStream bout = new ByteArrayOutputStream();
  header.write(new DataOutputStream(bout));
  ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
  when(connection.getInputStream()).thenReturn(in);
  when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
      .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
  when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
      .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
  when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
      .thenReturn(immo);

  doThrow(new java.lang.InternalError()).when(immo)
      .shuffle(any(MapHost.class), any(InputStream.class), anyLong(), 
          anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));

  underTest.copyFromHost(host);

  verify(connection)
      .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, 
        encHash);
  verify(ss, times(1)).copyFailed(map1ID, host, true, false);
}
项目:aliyun-maxcompute-data-collectors    文件:ExplicitSetMapper.java   
public void map(LongWritable key, Text val,
    OutputCollector<Text, NullWritable> out, Reporter r) throws IOException {

  // Try to set the field.
  userRecord.setField(setCol, setVal);
  Map<String, Object> fieldVals = userRecord.getFieldMap();
  if (!fieldVals.get(setCol).equals(setVal)) {
    throw new IOException("Could not set column value! Got back "
        + fieldVals.get(setCol));
  } else {
    LOG.info("Correctly changed value for col " + setCol + " to " + setVal);
  }
}
项目:aliyun-maxcompute-data-collectors    文件:ReparseMapper.java   
public void map(LongWritable key, Text val,
    OutputCollector<Text, NullWritable> out, Reporter r) throws IOException {

  LOG.info("Mapper input line: " + val.toString());

  try {
    // Use the user's record class to parse the line back in.
    userRecord.parse(val);
  } catch (RecordParser.ParseError pe) {
    LOG.error("Got parse error: " + pe.toString());
    throw new IOException(pe);
  }

  LOG.info("Mapper output line: " + userRecord.toString());

  out.collect(new Text(userRecord.toString()), NullWritable.get());

  if (!userRecord.toString(false).equals(val.toString())) {
    // Could not format record w/o end-of-record delimiter.
    throw new IOException("Returned string w/o EOR has value ["
        + userRecord.toString(false) + "] when ["
        + val.toString() + "] was expected.");
  }

  if (!userRecord.toString().equals(val.toString() + "\n")) {
    // misparsed.
    throw new IOException("Returned string has value ["
        + userRecord.toString() + "] when ["
        + val.toString() + "\n] was expected.");
  }
}
项目:hadoop    文件:TestDFSIO.java   
private void sequentialTest(FileSystem fs, 
                            TestType testType, 
                            long fileSize, // in bytes
                            int nrFiles
                           ) throws IOException {
  IOStatMapper ioer = null;
  switch(testType) {
  case TEST_TYPE_READ:
    ioer = new ReadMapper();
    break;
  case TEST_TYPE_WRITE:
    ioer = new WriteMapper();
    break;
  case TEST_TYPE_APPEND:
    ioer = new AppendMapper();
    break;
  case TEST_TYPE_READ_RANDOM:
  case TEST_TYPE_READ_BACKWARD:
  case TEST_TYPE_READ_SKIP:
    ioer = new RandomReadMapper();
    break;
  case TEST_TYPE_TRUNCATE:
    ioer = new TruncateMapper();
    break;
  default:
    return;
  }
  for(int i=0; i < nrFiles; i++)
    ioer.doIO(Reporter.NULL,
              BASE_FILE_NAME+Integer.toString(i), 
              fileSize);
}
项目:hadoop    文件:SliveReducer.java   
@Override // Reducer
public void reduce(Text key, Iterator<Text> values,
    OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
  OperationOutput collector = null;
  int reduceAm = 0;
  int errorAm = 0;
  logAndSetStatus(reporter, "Iterating over reduction values for key " + key);
  while (values.hasNext()) {
    Text value = values.next();
    try {
      OperationOutput val = new OperationOutput(key, value);
      if (collector == null) {
        collector = val;
      } else {
        collector = OperationOutput.merge(collector, val);
      }
      LOG.info("Combined " + val + " into/with " + collector);
      ++reduceAm;
    } catch (Exception e) {
      ++errorAm;
      logAndSetStatus(reporter, "Error iterating over reduction input "
          + value + " due to : " + StringUtils.stringifyException(e));
      if (getConfig().shouldExitOnFirstError()) {
        break;
      }
    }
  }
  logAndSetStatus(reporter, "Reduced " + reduceAm + " values with " + errorAm
      + " errors");
  if (collector != null) {
    logAndSetStatus(reporter, "Writing output " + collector.getKey() + " : "
        + collector.getOutputValue());
    output.collect(collector.getKey(), collector.getOutputValue());
  }
}
项目:hadoop    文件:DummyInputFormat.java   
public RecordReader<Object, Object> getRecordReader(InputSplit split,
    JobConf job, Reporter reporter) throws IOException {
  return new RecordReader<Object, Object>() {

    boolean once = false;

    public boolean next(Object key, Object value) throws IOException {
      if (!once) {
        once = true;
        return true;
      }
      return false;
    }

    public Object createKey() {
      return new Object();
    }

    public Object createValue() {
      return new Object();
    }

    public long getPos() throws IOException {
      return 0L;
    }

    public void close() throws IOException {
    }

    public float getProgress() throws IOException {
      return 0.0f;
    }
  };
}
项目:hadoop    文件:SliveMapper.java   
/**
 * Runs the given operation and reports on its results
 * 
 * @param op
 *          the operation to run
 * @param reporter
 *          the status reporter to notify
 * @param output
 *          the output to write to
 * @throws IOException
 */
private void runOperation(Operation op, Reporter reporter,
    OutputCollector<Text, Text> output, long opNum) throws IOException {
  if (op == null) {
    return;
  }
  logAndSetStatus(reporter, "Running operation #" + opNum + " (" + op + ")");
  List<OperationOutput> opOut = op.run(filesystem);
  logAndSetStatus(reporter, "Finished operation #" + opNum + " (" + op + ")");
  if (opOut != null && !opOut.isEmpty()) {
    for (OperationOutput outData : opOut) {
      output.collect(outData.getKey(), outData.getOutputValue());
    }
  }
}
项目:ditb    文件:TestRowCounter.java   
@Test
@SuppressWarnings({ "deprecation", "unchecked" })
public void shouldRegInReportEveryIncomingRow() throws IOException {
  int iterationNumber = 999;
  RowCounter.RowCounterMapper mapper = new RowCounter.RowCounterMapper();
  Reporter reporter = mock(Reporter.class);
  for (int i = 0; i < iterationNumber; i++)
    mapper.map(mock(ImmutableBytesWritable.class), mock(Result.class),
        mock(OutputCollector.class), reporter);

  Mockito.verify(reporter, times(iterationNumber)).incrCounter(
      any(Enum.class), anyInt());
}
项目:hadoop    文件:PipeMapRed.java   
MROutputThread(OutputReader outReader, OutputCollector outCollector,
  Reporter reporter) {
  setDaemon(true);
  this.outReader = outReader;
  this.outCollector = outCollector;
  this.reporter = reporter;
}
项目:hadoop    文件:MRCaching.java   
public void map(LongWritable key, Text value,
                OutputCollector<Text, IntWritable> output,
                Reporter reporter) throws IOException {
  String line = value.toString();
  StringTokenizer itr = new StringTokenizer(line);
  while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    output.collect(word, one);
  }

}
项目:hadoop    文件:MRCaching.java   
public void reduce(Text key, Iterator<IntWritable> values,
                   OutputCollector<Text, IntWritable> output,
                   Reporter reporter) throws IOException {
  int sum = 0;
  while (values.hasNext()) {
    sum += values.next().get();
  }
  output.collect(key, new IntWritable(sum));
}
项目:hadoop    文件:WordCount.java   
public void map(LongWritable key, Text value, 
                OutputCollector<Text, IntWritable> output, 
                Reporter reporter) throws IOException {
  String line = value.toString();
  StringTokenizer itr = new StringTokenizer(line);
  while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    output.collect(word, one);
  }
}
项目:hadoop    文件:PipeMapRunner.java   
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
                Reporter reporter)
       throws IOException {
  PipeMapper pipeMapper = (PipeMapper)getMapper();
  pipeMapper.startOutputThreads(output, reporter);
  super.run(input, output, reporter);
}
项目:hadoop    文件:TestStreamingOutputKeyValueTypes.java   
public void reduce(K key, Iterator<V> values,
    OutputCollector<LongWritable, Text> output, Reporter reporter)
    throws IOException {
  LongWritable l = new LongWritable();
  while (values.hasNext()) {
    output.collect(l, new Text(values.next().toString()));
  }
}
项目:hadoop    文件:TestDatamerge.java   
public void reduce(IntWritable key, Iterator<IntWritable> values,
                   OutputCollector<Text, Text> output,
                   Reporter reporter) throws IOException {
  int seen = 0;
  while (values.hasNext()) {
    seen += values.next().get();
  }
  assertTrue("Bad count for " + key.get(), verify(key.get(), seen));
}
项目:hadoop    文件:TestDatamerge.java   
public void map(IntWritable key, TupleWritable val,
    OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
    throws IOException {
  int k = key.get();
  final String kvstr = "Unexpected tuple: " + stringify(key, val);
  assertTrue(kvstr, 0 == k % (srcs * srcs));
  for (int i = 0; i < val.size(); ++i) {
    final int vali = ((IntWritable)val.get(i)).get();
    assertTrue(kvstr, (vali - i) * srcs == 10 * k);
  }
  out.collect(key, one);
}
项目:hadoop    文件:TestDatamerge.java   
public void map(IntWritable key, IntWritable val,
    OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
    throws IOException {
  int k = key.get();
  final int vali = val.get();
  final String kvstr = "Unexpected tuple: " + stringify(key, val);
  if (0 == k % (srcs * srcs)) {
    assertTrue(kvstr, vali == k * 10 / srcs + srcs - 1);
  } else {
    final int i = k % srcs;
    assertTrue(kvstr, srcs * (vali - i) == 10 * (k - i));
  }
  out.collect(key, one);
}
项目:hadoop    文件:TestCombineFileRecordReader.java   
@SuppressWarnings("unchecked")
@Test
public void testInitNextRecordReader() throws IOException{
  JobConf conf = new JobConf();
  Path[] paths = new Path[3];
  long[] fileLength = new long[3];
  File[] files = new File[3];
  LongWritable key = new LongWritable(1);
  Text value = new Text();
  try {
    for(int i=0;i<3;i++){
      fileLength[i] = i;
      File dir = new File(outDir.toString());
      dir.mkdir();
      files[i] = new File(dir,"testfile"+i);
      FileWriter fileWriter = new FileWriter(files[i]);
      fileWriter.close();
      paths[i] = new Path(outDir+"/testfile"+i);
    }
    CombineFileSplit combineFileSplit = new CombineFileSplit(conf, paths, fileLength);
    Reporter reporter = Mockito.mock(Reporter.class);
    CombineFileRecordReader cfrr = new CombineFileRecordReader(conf, combineFileSplit,
      reporter,  TextRecordReaderWrapper.class);
    verify(reporter).progress();
    Assert.assertFalse(cfrr.next(key,value));
    verify(reporter, times(3)).progress();
  } finally {
    FileUtil.fullyDelete(new File(outDir.toString()));
  }

}
项目:ditb    文件:IdentityTableReduce.java   
/**
 * No aggregation, output pairs of (key, record)
 * @param key
 * @param values
 * @param output
 * @param reporter
 * @throws IOException
 */
public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
    OutputCollector<ImmutableBytesWritable, Put> output,
    Reporter reporter)
    throws IOException {

  while(values.hasNext()) {
    output.collect(key, values.next());
  }
}
项目:hadoop    文件:TestInputSampler.java   
@Override
public org.apache.hadoop.mapred.RecordReader<IntWritable, NullWritable>
    getRecordReader(final org.apache.hadoop.mapred.InputSplit split,
        JobConf job, Reporter reporter) throws IOException {
  return new org.apache.hadoop.mapred.RecordReader
      <IntWritable, NullWritable>() {
    private final IntWritable i =
        new IntWritable(((MapredSequentialSplit)split).getInit());
    private int maxVal = i.get() + maxDepth + 1;

    @Override
    public boolean next(IntWritable key, NullWritable value)
        throws IOException {
      i.set(i.get() + 1);
      return i.get() < maxVal;
    }
    @Override
    public IntWritable createKey() {
      return new IntWritable(i.get());
    }
    @Override
    public NullWritable createValue() {
      return NullWritable.get();
    }
    @Override
    public long getPos() throws IOException {
      return 0;
    }
    @Override
    public void close() throws IOException {
    }
    @Override
    public float getProgress() throws IOException {
      return 0;
    }
  };
}
项目:hadoop    文件:ValueCountReduce.java   
public void reduce(Object arg0, Iterator arg1, OutputCollector arg2, Reporter arg3) throws IOException {
  int count = 0;
  while (arg1.hasNext()) {
    count += 1;
    arg1.next();
  }
  arg2.collect(arg0, new Text("" + count));
}
项目:hadoop    文件:NNBench.java   
/**
 * Delete operation
 * @param name of prefix of the file to be deleted
 * @param reporter an instanse of (@link Reporter) to be used for
 *   status' updates
 */
private void doDeleteOp(String name,
                        Reporter reporter) {
  for (long l = 0l; l < numberOfFiles; l++) {
    Path filePath = new Path(new Path(baseDir, dataDirName), 
            name + "_" + l);

    boolean successfulOp = false;
    while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
      try {
        // Set up timer for measuring AL
        startTimeAL = System.currentTimeMillis();
        filesystem.delete(filePath, true);
        totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);

        successfulOp = true;
        successfulFileOps ++;

        reporter.setStatus("Finish "+ l + " files");
      } catch (IOException e) {
        LOG.info("Exception in recorded op: Delete");

        numOfExceptions++;
      }
    }
  }
}
项目:hadoop    文件:TestFetcher.java   
@SuppressWarnings("unchecked")
@Test(timeout=10000)
public void testCopyFromHostWithRetryThenTimeout() throws Exception {
  InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
  Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(jobWithRetry,
      id, ss, mm, r, metrics, except, key, connection);

  String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);

  when(connection.getResponseCode()).thenReturn(200)
    .thenThrow(new SocketTimeoutException("forced timeout"));
  when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
      .thenReturn(replyHash);
  ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
  ByteArrayOutputStream bout = new ByteArrayOutputStream();
  header.write(new DataOutputStream(bout));
  ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
  when(connection.getInputStream()).thenReturn(in);
  when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
      .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
  when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
      .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
  when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
      .thenReturn(immo);
  doThrow(new IOException("forced error")).when(immo).shuffle(
      any(MapHost.class), any(InputStream.class), anyLong(),
      anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));

  underTest.copyFromHost(host);
  verify(allErrs).increment(1);
  verify(ss).copyFailed(map1ID, host, false, false);
}
项目:hadoop    文件:ExternalIdentityReducer.java   
public void reduce(WritableComparable key, Iterator<Writable> values,
                   OutputCollector<WritableComparable, Writable> output,
                   Reporter reporter)
  throws IOException {

  while (values.hasNext()) {
    output.collect(key, values.next());
  }
}
项目:hadoop    文件:ExternalMapperReducer.java   
public void map(WritableComparable key, Writable value,
                OutputCollector<ExternalWritable, IntWritable> output,
                Reporter reporter)
  throws IOException {

  if (value instanceof Text) {
    Text text = (Text)value;
    ExternalWritable ext = new ExternalWritable(text.toString());
    output.collect(ext, new IntWritable(1));
  }
}