Java 类org.apache.hadoop.mapred.RecordReader 实例源码

项目:multiple-dimension-spread    文件:MDSHiveLineInputFormat.java   
@Override
public RecordReader<NullWritable,ColumnAndIndex> getRecordReader( final InputSplit split, final JobConf job, final Reporter reporter ) throws IOException {
  FileSplit fileSplit = (FileSplit)split;
  Path path = fileSplit.getPath();
  FileSystem fs = path.getFileSystem( job );
  long fileLength = fs.getLength( path );
  long start = fileSplit.getStart();
  long length = fileSplit.getLength();
  InputStream in = fs.open( path );
  IJobReporter jobReporter = new HadoopJobReporter( reporter );
  jobReporter.setStatus( String.format( "Read file : %s" , path.toString() ) );
  HiveReaderSetting hiveConfig = new HiveReaderSetting( fileSplit , job );
  if ( hiveConfig.isVectorMode() ){
    IVectorizedReaderSetting vectorizedSetting = new HiveVectorizedReaderSetting( fileSplit , job , hiveConfig );
    return (RecordReader)new MDSHiveDirectVectorizedReader( in , fileLength , start , length , vectorizedSetting , jobReporter );
  }
  else{
    return new MDSHiveLineReader( in , fileLength , start , length , hiveConfig , jobReporter , spreadCounter );
  }
}
项目:angel    文件:DFSStorageOldAPI.java   
@SuppressWarnings({"rawtypes", "unchecked"})
public void initReader() throws IOException {
  try {
    Configuration conf = WorkerContext.get().getConf();
    String inputFormatClassName =
        conf.get(AngelConf.ANGEL_INPUTFORMAT_CLASS,
            AngelConf.DEFAULT_ANGEL_INPUTFORMAT_CLASS);

    Class<? extends org.apache.hadoop.mapred.InputFormat> inputFormatClass =
        (Class<? extends org.apache.hadoop.mapred.InputFormat>) Class
            .forName(inputFormatClassName);

    org.apache.hadoop.mapred.InputFormat inputFormat =
        ReflectionUtils.newInstance(inputFormatClass,
            new JobConf(conf));

    org.apache.hadoop.mapred.RecordReader<KEY, VALUE> recordReader =
        inputFormat.getRecordReader(split, new JobConf(conf), Reporter.NULL);

    setReader(new DFSReaderOldAPI(recordReader));
  } catch (Exception x) {
    LOG.error("init reader error ", x);
    throw new IOException(x);
  }
}
项目:hadoop    文件:InputSampler.java   
/**
 * From each split sampled, take the first numSamples / numSplits records.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  int samplesPerSplit = numSamples / splitsToSample;
  long records = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      samples.add(key);
      key = reader.createKey();
      ++records;
      if ((i+1) * samplesPerSplit <= records) {
        break;
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
项目:hadoop    文件:InputSampler.java   
/**
 * For each split sampled, emit when the ratio of the number of records
 * retained to the total record count is less than the specified
 * frequency.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>();
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  long records = 0;
  long kept = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      ++records;
      if ((double) kept / records < freq) {
        ++kept;
        samples.add(key);
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
项目:hadoop    文件:TestDBInputFormat.java   
/**
 * test DBInputFormat class. Class should split result for chunks
 * @throws Exception
 */
@Test(timeout = 10000)
public void testDBInputFormat() throws Exception {
  JobConf configuration = new JobConf();
  setupDriver(configuration);

  DBInputFormat<NullDBWritable> format = new DBInputFormat<NullDBWritable>();
  format.setConf(configuration);
  format.setConf(configuration);
  DBInputFormat.DBInputSplit splitter = new DBInputFormat.DBInputSplit(1, 10);
  Reporter reporter = mock(Reporter.class);
  RecordReader<LongWritable, NullDBWritable> reader = format.getRecordReader(
      splitter, configuration, reporter);

  configuration.setInt(MRJobConfig.NUM_MAPS, 3);
  InputSplit[] lSplits = format.getSplits(configuration, 3);
  assertEquals(5, lSplits[0].getLength());
  assertEquals(3, lSplits.length);

  // test reader .Some simple tests
  assertEquals(LongWritable.class, reader.createKey().getClass());
  assertEquals(0, reader.getPos());
  assertEquals(0, reader.getProgress(), 0.001);
  reader.close();
}
项目:hadoop    文件:AutoInputFormat.java   
public RecordReader getRecordReader(InputSplit split, JobConf job,
  Reporter reporter) throws IOException {
  FileSplit fileSplit = (FileSplit) split;
  FileSystem fs = FileSystem.get(fileSplit.getPath().toUri(), job);
  FSDataInputStream is = fs.open(fileSplit.getPath());
  byte[] header = new byte[3];
  RecordReader reader = null;
  try {
    is.readFully(header);
  } catch (EOFException eof) {
    reader = textInputFormat.getRecordReader(split, job, reporter);
  } finally {
    is.close();
  }
  if (header[0] == 'S' && header[1] == 'E' && header[2] == 'Q') {
    reader = seqFileInputFormat.getRecordReader(split, job, reporter);
  } else {
    reader = textInputFormat.getRecordReader(split, job, reporter);
  }
  return reader;
}
项目:hadoopcryptoledger    文件:EthereumFormatHadoopTest.java   
@Test
 public void readEthereumBlockInputFormatBlock1346406Bzip2Compressed() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
    JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth1346406.bin.bz2";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();   
Path file = new Path(fileNameBlock);
   FileInputFormat.setInputPaths(job, file);
   EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
   format.configure(job);
   InputSplit[] inputSplits = format.getSplits(job,1);

   assertEquals( 1, inputSplits.length,"Only one split generated for genesis block");
    RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned  null RecordReader");

BytesWritable key = new BytesWritable();    
EthereumBlock block = new EthereumBlock();
assertTrue( reader.next(key,block),"Input Split for block 1346406 contains at least one block");

assertEquals( 6, block.getEthereumTransactions().size(),"Block 1346406 must have 6 transactions");
    assertFalse( reader.next(key,block),"No further blocks in block 1346406");
    reader.close();
}
项目:hadoopoffice    文件:OfficeFormatHadoopExcelTest.java   
@Test
   public void readExcelInputFormatExcel2013SingleSheetEncryptedNegativeLowFootprint() throws IOException {
    JobConf job = new JobConf(defaultConf);
    ClassLoader classLoader = getClass().getClassLoader();
    String fileName="excel2013encrypt.xlsx";
    String fileNameSpreadSheet=classLoader.getResource(fileName).getFile(); 
    Path file = new Path(fileNameSpreadSheet);
    FileInputFormat.setInputPaths(job, file);
// set locale to the one of the test data
job.set("hadoopoffice.read.locale.bcp47","de");
// low footprint
job.set("hadoopoffice.read.lowFootprint", "true");
// for decryption simply set the password
job.set("hadoopoffice.read.security.crypt.password","test2");
    ExcelFileInputFormat format = new ExcelFileInputFormat();
    format.configure(job);
    InputSplit[] inputSplits = format.getSplits(job,1);
    assertEquals(1,inputSplits.length,"Only one split generated for Excel file");
    RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter);   
    assertNull(reader,"Null record reader implies invalid password");
   }
项目:drill    文件:HiveAbstractReader.java   
/**
 * Initializes next reader if available, will close previous reader if any.
 *
 * @param job map / reduce job configuration.
 * @return true if new reader was initialized, false is no more readers are available
 * @throws ExecutionSetupException if could not init record reader
 */
protected boolean initNextReader(JobConf job) throws ExecutionSetupException {
  if (inputSplitsIterator.hasNext()) {
    if (reader != null) {
      closeReader();
    }
    InputSplit inputSplit = inputSplitsIterator.next();
    try {
      reader = (org.apache.hadoop.mapred.RecordReader<Object, Object>) job.getInputFormat().getRecordReader(inputSplit, job, Reporter.NULL);
      logger.trace("hive reader created: {} for inputSplit {}", reader.getClass().getName(), inputSplit.toString());
    } catch (Exception e) {
      throw new ExecutionSetupException("Failed to get o.a.hadoop.mapred.RecordReader from Hive InputFormat", e);
    }
    return true;
  }
  return false;
}
项目:aliyun-oss-hadoop-fs    文件:InputSampler.java   
/**
 * From each split sampled, take the first numSamples / numSplits records.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  int samplesPerSplit = numSamples / splitsToSample;
  long records = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      samples.add(key);
      key = reader.createKey();
      ++records;
      if ((i+1) * samplesPerSplit <= records) {
        break;
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
项目:aliyun-oss-hadoop-fs    文件:InputSampler.java   
/**
 * For each split sampled, emit when the ratio of the number of records
 * retained to the total record count is less than the specified
 * frequency.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>();
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  long records = 0;
  long kept = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      ++records;
      if ((double) kept / records < freq) {
        ++kept;
        samples.add(key);
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
项目:emr-dynamodb-connector    文件:ImportRecordReaderFactory.java   
static RecordReader<NullWritable, DynamoDBItemWritable> getRecordReader(
    InputSplit inputSplit, JobConf job, Reporter reporter) throws IOException {
  // CombineFileSplit indicates the new export format which includes a manifest file
  if (inputSplit instanceof CombineFileSplit) {
    int version = job.getInt(DynamoDBConstants.EXPORT_FORMAT_VERSION, -1);
    if (version != ExportManifestRecordWriter.FORMAT_VERSION) {
      throw new IOException("Unknown version: " + job.get(DynamoDBConstants
          .EXPORT_FORMAT_VERSION));
    }
    return new ImportCombineFileRecordReader((CombineFileSplit) inputSplit, job, reporter);
  } else if (inputSplit instanceof FileSplit) {
    // FileSplit indicates the old data pipeline format which doesn't include a manifest file
    Path path = ((FileSplit) inputSplit).getPath();
    return new ImportRecordReader(job, path);
  } else {
    throw new IOException("Expecting CombineFileSplit or FileSplit but the input split type is:"
        + " " + inputSplit.getClass());
  }
}
项目:hadoopcryptoledger    文件:EthereumFormatHadoopTest.java   
@Test
 public void readEthereumBlockInputFormatBlock1346406() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
    JobConf job = new JobConf(defaultConf);
                ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth1346406.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();   
Path file = new Path(fileNameBlock);
   FileInputFormat.setInputPaths(job, file);
   EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
   format.configure(job);
   InputSplit[] inputSplits = format.getSplits(job,1);

   assertEquals( 1, inputSplits.length,"Only one split generated for genesis block");
    RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned  null RecordReader");

BytesWritable key = new BytesWritable();    
EthereumBlock block = new EthereumBlock();
assertTrue( reader.next(key,block),"Input Split for block 1346406 contains at least one block");
assertEquals( 6, block.getEthereumTransactions().size(),"Block 1346406 must have 6 transactions");
    assertFalse( reader.next(key,block),"No further blocks in block 1346406");
    reader.close();
}
项目:big-c    文件:InputSampler.java   
/**
 * For each split sampled, emit when the ratio of the number of records
 * retained to the total record count is less than the specified
 * frequency.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>();
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  long records = 0;
  long kept = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      ++records;
      if ((double) kept / records < freq) {
        ++kept;
        samples.add(key);
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
项目:hadoopoffice    文件:OfficeFormatHadoopExcelTest.java   
@Test
   public void readExcelInputFormatExcel2003SingleSheetEncryptedNegativeLowFootprint() throws IOException {
    JobConf job = new JobConf(defaultConf);
    ClassLoader classLoader = getClass().getClassLoader();
    String fileName="excel2003encrypt.xls";
    String fileNameSpreadSheet=classLoader.getResource(fileName).getFile(); 
    Path file = new Path(fileNameSpreadSheet);
    FileInputFormat.setInputPaths(job, file);
// set locale to the one of the test data
job.set("hadoopoffice.read.locale.bcp47","de");

// low footprint
job.set("hadoopoffice.read.lowFootprint", "true");
// for decryption simply set the password
job.set("hadoopoffice.read.security.crypt.password","test2");
    ExcelFileInputFormat format = new ExcelFileInputFormat();
    format.configure(job);
    InputSplit[] inputSplits = format.getSplits(job,1);
    assertEquals(1,inputSplits.length,"Only one split generated for Excel file");
    RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter);
    assertNull(reader,"Null record reader implies invalid password");
   }
项目:hadoopoffice    文件:OfficeFormatHadoopExcelTest.java   
@Test
    public void readExcelInputFormatExcel2003Empty() throws IOException {
JobConf job = new JobConf(defaultConf);
        ClassLoader classLoader = getClass().getClassLoader();
        String fileName="excel2003empty.xls";
        String fileNameSpreadSheet=classLoader.getResource(fileName).getFile(); 
        Path file = new Path(fileNameSpreadSheet);
        FileInputFormat.setInputPaths(job, file);
    // set locale to the one of the test data
    job.set("hadoopoffice.locale.bcp47","de");
    ExcelFileInputFormat format = new ExcelFileInputFormat();
        format.configure(job);
        InputSplit[] inputSplits = format.getSplits(job,1);
        assertEquals(1, inputSplits.length,"Only one split generated for Excel file");
        RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter);
    assertNotNull(reader,"Format returned  null RecordReader");
    Text spreadSheetKey = new Text();   
    ArrayWritable spreadSheetValue = new ArrayWritable(SpreadSheetCellDAO.class);
    assertTrue( reader.next(spreadSheetKey,spreadSheetValue),"Input Split for Excel file contains row 1");
    assertEquals(0,spreadSheetValue.get().length,"Input Split for Excel file contain row 1 and is empty");  
    assertFalse(reader.next(spreadSheetKey,spreadSheetValue),"Input Split for Excel file contains no further row"); 
    }
项目:hadoopoffice    文件:OfficeFormatHadoopExcelTest.java   
@Test
   public void readExcelInputFormatExcel2013Empty() throws IOException {
       JobConf job = new JobConf(defaultConf);
    ClassLoader classLoader = getClass().getClassLoader();
    String fileName="excel2013empty.xlsx";
    String fileNameSpreadSheet=classLoader.getResource(fileName).getFile(); 
    Path file = new Path(fileNameSpreadSheet);
    FileInputFormat.setInputPaths(job, file);
// set locale to the one of the test data
job.set("hadoopoffice.read.locale.bcp47","de");
    ExcelFileInputFormat format = new ExcelFileInputFormat();
    format.configure(job);
    InputSplit[] inputSplits = format.getSplits(job,1);
    assertEquals(1, inputSplits.length,"Only one split generated for Excel file");
    RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull(reader,"Format returned  null RecordReader");
Text spreadSheetKey = new Text();   
ArrayWritable spreadSheetValue = new ArrayWritable(SpreadSheetCellDAO.class);
assertTrue( reader.next(spreadSheetKey,spreadSheetValue), "Input Split for Excel file contains row 1");
assertEquals(0,spreadSheetValue.get().length, "Input Split for Excel file contain row 1 and is empty"); 
assertFalse(reader.next(spreadSheetKey,spreadSheetValue), "Input Split for Excel file contains no further row");        
   }
项目:hadoopcryptoledger    文件:EthereumFormatHadoopTest.java   
@Test
 public void readEthereumBlockInputFormatBlock3346406() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth3346406.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();   
Path file = new Path(fileNameBlock);
   FileInputFormat.setInputPaths(job, file);
   EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
   format.configure(job);
   InputSplit[] inputSplits = format.getSplits(job,1);

   assertEquals( 1, inputSplits.length,"Only one split generated for genesis block");
    RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned  null RecordReader");

BytesWritable key = new BytesWritable();    
EthereumBlock block = new EthereumBlock();
assertTrue( reader.next(key,block),"Input Split for block 3346406 contains at least one block");
assertEquals( 7, block.getEthereumTransactions().size(),"Block 3346406 must have 7 transactions");
    assertFalse( reader.next(key,block),"No further blocks in block 3346406");
    reader.close();
}
项目:hadoop    文件:DummyInputFormat.java   
public RecordReader<Object, Object> getRecordReader(InputSplit split,
    JobConf job, Reporter reporter) throws IOException {
  return new RecordReader<Object, Object>() {

    boolean once = false;

    public boolean next(Object key, Object value) throws IOException {
      if (!once) {
        once = true;
        return true;
      }
      return false;
    }

    public Object createKey() {
      return new Object();
    }

    public Object createValue() {
      return new Object();
    }

    public long getPos() throws IOException {
      return 0L;
    }

    public void close() throws IOException {
    }

    public float getProgress() throws IOException {
      return 0.0f;
    }
  };
}
项目:hadoop    文件:LoadGeneratorMR.java   
public RecordReader<LongWritable, Text> getRecordReader(
    InputSplit ignored, JobConf conf, Reporter reporter) throws IOException {

  return new RecordReader<LongWritable, Text>() {

    boolean sentOneRecord = false;

    public boolean next(LongWritable key, Text value)
        throws IOException {
      key.set(1);
      value.set("dummy");
      if (sentOneRecord == false) { // first call
        sentOneRecord = true;
        return true;
      }
      return false; // we have sent one record - we are done
    }

    public LongWritable createKey() {
      return new LongWritable();
    }
    public Text createValue() {
      return new Text();
    }
    public long getPos() throws IOException {
      return 1;
    }
    public void close() throws IOException {
    }
    public float getProgress() throws IOException {
      return 1;
    }
  };
}
项目:hadoop    文件:TestDatamerge.java   
public RecordReader<K,V> getRecordReader(
    InputSplit ignored, JobConf conf, Reporter reporter) {
  return new RecordReader<K,V>() {
    public boolean next(K key, V value) throws IOException { return false; }
    public K createKey() {
      return ReflectionUtils.newInstance(keyclass, null);
    }
    public V createValue() {
      return ReflectionUtils.newInstance(valclass, null);
    }
    public long getPos() throws IOException { return 0L; }
    public void close() throws IOException { }
    public float getProgress() throws IOException { return 0.0f; }
  };
}
项目:hadoop    文件:OutputHandler.java   
/**
 * Create a handler that will handle any records output from the application.
 * @param collector the "real" collector that takes the output
 * @param reporter the reporter for reporting progress
 */
public OutputHandler(OutputCollector<K, V> collector, Reporter reporter, 
                     RecordReader<FloatWritable,NullWritable> recordReader,
                     String expectedDigest) {
  this.reporter = reporter;
  this.collector = collector;
  this.recordReader = recordReader;
  this.expectedDigest = expectedDigest;
}
项目:hadoop    文件:MultiFilterRecordReader.java   
/** {@inheritDoc} */
@SuppressWarnings("unchecked") // Explicit check for value class agreement
public V createValue() {
  if (null == valueclass) {
    final Class<?> cls = kids[0].createValue().getClass();
    for (RecordReader<K,? extends V> rr : kids) {
      if (!cls.equals(rr.createValue().getClass())) {
        throw new ClassCastException("Child value classes fail to agree");
      }
    }
    valueclass = cls.asSubclass(Writable.class);
    ivalue = createInternalValue();
  }
  return (V) ReflectionUtils.newInstance(valueclass, null);
}
项目:hadoop    文件:CompositeRecordReader.java   
/**
 * Create a new key value common to all child RRs.
 * @throws ClassCastException if key classes differ.
 */
@SuppressWarnings("unchecked") // Explicit check for key class agreement
public K createKey() {
  if (null == keyclass) {
    final Class<?> cls = kids[0].createKey().getClass();
    for (RecordReader<K,? extends Writable> rr : kids) {
      if (!cls.equals(rr.createKey().getClass())) {
        throw new ClassCastException("Child key classes fail to agree");
      }
    }
    keyclass = cls.asSubclass(WritableComparable.class);
  }
  return (K) ReflectionUtils.newInstance(keyclass, getConf());
}
项目:hadoop    文件:CompositeRecordReader.java   
/**
 * Close all child RRs.
 */
public void close() throws IOException {
  if (kids != null) {
    for (RecordReader<K,? extends Writable> rr : kids) {
      rr.close();
    }
  }
  if (jc != null) {
    jc.close();
  }
}
项目:hadoop    文件:CompositeRecordReader.java   
/**
 * Report progress as the minimum of all child RR progress.
 */
public float getProgress() throws IOException {
  float ret = 1.0f;
  for (RecordReader<K,? extends Writable> rr : kids) {
    ret = Math.min(ret, rr.getProgress());
  }
  return ret;
}
项目:hadoop    文件:NLineInputFormat.java   
public RecordReader<LongWritable, Text> getRecordReader(
                                          InputSplit genericSplit,
                                          JobConf job,
                                          Reporter reporter) 
throws IOException {
  reporter.setStatus(genericSplit.toString());
  return new LineRecordReader(job, (FileSplit) genericSplit);
}
项目:hadoop    文件:DelegatingInputFormat.java   
@SuppressWarnings("unchecked")
public RecordReader<K, V> getRecordReader(InputSplit split, JobConf conf,
    Reporter reporter) throws IOException {

  // Find the InputFormat and then the RecordReader from the
  // TaggedInputSplit.

  TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
  InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
     .newInstance(taggedInputSplit.getInputFormatClass(), conf);
  return inputFormat.getRecordReader(taggedInputSplit.getInputSplit(), conf,
     reporter);
}
项目:hadoop    文件:DBInputFormat.java   
/** {@inheritDoc} */
public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
    JobConf job, Reporter reporter) throws IOException {

  // wrap the DBRR in a shim class to deal with API differences.
  return new DBRecordReaderWrapper<T>(
      (org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>) 
      createDBRecordReader(
        (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit) split, job));
}
项目:hadoop    文件:PipeMapRunner.java   
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
                Reporter reporter)
       throws IOException {
  PipeMapper pipeMapper = (PipeMapper)getMapper();
  pipeMapper.startOutputThreads(output, reporter);
  super.run(input, output, reporter);
}
项目:hadoop    文件:DumpTypedBytes.java   
/**
 * Dump given list of files to standard output as typed bytes.
 */
@SuppressWarnings("unchecked")
private int dumpTypedBytes(List<FileStatus> files) throws IOException {
  JobConf job = new JobConf(getConf()); 
  DataOutputStream dout = new DataOutputStream(System.out);
  AutoInputFormat autoInputFormat = new AutoInputFormat();
  for (FileStatus fileStatus : files) {
    FileSplit split = new FileSplit(fileStatus.getPath(), 0,
      fileStatus.getLen() * fileStatus.getBlockSize(),
      (String[]) null);
    RecordReader recReader = null;
    try {
      recReader = autoInputFormat.getRecordReader(split, job, Reporter.NULL);
      Object key = recReader.createKey();
      Object value = recReader.createValue();
      while (recReader.next(key, value)) {
        if (key instanceof Writable) {
          TypedBytesWritableOutput.get(dout).write((Writable) key);
        } else {
          TypedBytesOutput.get(dout).write(key);
        }
        if (value instanceof Writable) {
          TypedBytesWritableOutput.get(dout).write((Writable) value);
        } else {
          TypedBytesOutput.get(dout).write(value);
        }
      }
    } finally {
      if (recReader != null) {
        recReader.close();
      }
    }
  }
  dout.flush();
  return 0;
}
项目:dremio-oss    文件:HiveTextReader.java   
protected SkipRecordsInspector(Properties tableProperties, RecordReader reader) {
  this.fileFormats = new HashSet<Object>(Arrays.asList(org.apache.hadoop.mapred.TextInputFormat.class.getName()));
  this.headerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.HEADER_COUNT, 0);
  this.footerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.FOOTER_COUNT, 0);
  logger.debug("skipRecordInspector: fileFormat {}, headerCount {}, footerCount {}", this.fileFormats,
      this.headerCount, this.footerCount);
  this.footerBuffer = Lists.newLinkedList();
  this.continuance = false;
  this.holderIndex = -1;
  this.valueHolder = initializeValueHolder(reader, footerCount);
  this.actualCount = 0;
  this.tempCount = 0;
}
项目:dremio-oss    文件:HiveRecordReaders.java   
@Override
public int populateData() throws IOException, SerDeException {
  final RecordReader<Object, Object> reader = this.reader;
  final Converter partTblObjectInspectorConverter = this.partTblObjectInspectorConverter;
  final int numRowsPerBatch = (int) this.numRowsPerBatch;

  final StructField[] selectedStructFieldRefs = this.selectedStructFieldRefs;
  final SerDe partitionSerDe = this.partitionSerDe;
  final StructObjectInspector finalOI = this.finalOI;
  final ObjectInspector[] selectedColumnObjInspectors = this.selectedColumnObjInspectors;
  final HiveFieldConverter[] selectedColumnFieldConverters = this.selectedColumnFieldConverters;
  final ValueVector[] vectors = this.vectors;
  final Object key = this.key;
  final Object value = this.value;

  int recordCount = 0;
  while (recordCount < numRowsPerBatch && reader.next(key, value)) {
    Object deSerializedValue = partitionSerDe.deserialize((Writable) value);
    if (partTblObjectInspectorConverter != null) {
      deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue);
    }
    for (int i = 0; i < selectedStructFieldRefs.length; i++) {
      Object hiveValue = finalOI.getStructFieldData(deSerializedValue, selectedStructFieldRefs[i]);
      if (hiveValue != null) {
        selectedColumnFieldConverters[i].setSafeValue(selectedColumnObjInspectors[i], hiveValue, vectors[i], recordCount);
      }
    }        
    recordCount++;
  }

  return recordCount;
}
项目:ditb    文件:TestTableSnapshotInputFormat.java   
private void verifyWithMockedMapReduce(JobConf job, int numRegions, int expectedNumSplits,
    byte[] startRow, byte[] stopRow) throws IOException, InterruptedException {
  TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
  InputSplit[] splits = tsif.getSplits(job, 0);

  Assert.assertEquals(expectedNumSplits, splits.length);

  HBaseTestingUtility.SeenRowTracker rowTracker =
    new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);

  for (int i = 0; i < splits.length; i++) {
    // validate input split
    InputSplit split = splits[i];
    Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit);

    // validate record reader
    OutputCollector collector = mock(OutputCollector.class);
    Reporter reporter = mock(Reporter.class);
    RecordReader<ImmutableBytesWritable, Result> rr = tsif.getRecordReader(split, job, reporter);

    // validate we can read all the data back
    ImmutableBytesWritable key = rr.createKey();
    Result value = rr.createValue();
    while (rr.next(key, value)) {
      verifyRowFromMap(key, value);
      rowTracker.addRow(key.copyBytes());
    }

    rr.close();
  }

  // validate all rows are seen
  rowTracker.validate();
}
项目:aliyun-oss-hadoop-fs    文件:LoadGeneratorMR.java   
public RecordReader<LongWritable, Text> getRecordReader(
    InputSplit ignored, JobConf conf, Reporter reporter) throws IOException {

  return new RecordReader<LongWritable, Text>() {

    boolean sentOneRecord = false;

    public boolean next(LongWritable key, Text value)
        throws IOException {
      key.set(1);
      value.set("dummy");
      if (sentOneRecord == false) { // first call
        sentOneRecord = true;
        return true;
      }
      return false; // we have sent one record - we are done
    }

    public LongWritable createKey() {
      return new LongWritable();
    }
    public Text createValue() {
      return new Text();
    }
    public long getPos() throws IOException {
      return 1;
    }
    public void close() throws IOException {
    }
    public float getProgress() throws IOException {
      return 1;
    }
  };
}
项目:aliyun-oss-hadoop-fs    文件:TestDatamerge.java   
public RecordReader<K,V> getRecordReader(
    InputSplit ignored, JobConf conf, Reporter reporter) {
  return new RecordReader<K,V>() {
    public boolean next(K key, V value) throws IOException { return false; }
    public K createKey() {
      return ReflectionUtils.newInstance(keyclass, null);
    }
    public V createValue() {
      return ReflectionUtils.newInstance(valclass, null);
    }
    public long getPos() throws IOException { return 0L; }
    public void close() throws IOException { }
    public float getProgress() throws IOException { return 0.0f; }
  };
}
项目:aliyun-oss-hadoop-fs    文件:OutputHandler.java   
/**
 * Create a handler that will handle any records output from the application.
 * @param collector the "real" collector that takes the output
 * @param reporter the reporter for reporting progress
 */
public OutputHandler(OutputCollector<K, V> collector, Reporter reporter, 
                     RecordReader<FloatWritable,NullWritable> recordReader,
                     String expectedDigest) {
  this.reporter = reporter;
  this.collector = collector;
  this.recordReader = recordReader;
  this.expectedDigest = expectedDigest;
}
项目:aliyun-oss-hadoop-fs    文件:CompositeRecordReader.java   
/**
 * Create a new key value common to all child RRs.
 * @throws ClassCastException if key classes differ.
 */
@SuppressWarnings("unchecked") // Explicit check for key class agreement
public K createKey() {
  if (null == keyclass) {
    final Class<?> cls = kids[0].createKey().getClass();
    for (RecordReader<K,? extends Writable> rr : kids) {
      if (!cls.equals(rr.createKey().getClass())) {
        throw new ClassCastException("Child key classes fail to agree");
      }
    }
    keyclass = cls.asSubclass(WritableComparable.class);
  }
  return (K) ReflectionUtils.newInstance(keyclass, getConf());
}
项目:aliyun-oss-hadoop-fs    文件:CompositeRecordReader.java   
/**
 * Close all child RRs.
 */
public void close() throws IOException {
  if (kids != null) {
    for (RecordReader<K,? extends Writable> rr : kids) {
      rr.close();
    }
  }
  if (jc != null) {
    jc.close();
  }
}
项目:aliyun-oss-hadoop-fs    文件:CompositeRecordReader.java   
/**
 * Report progress as the minimum of all child RR progress.
 */
public float getProgress() throws IOException {
  float ret = 1.0f;
  for (RecordReader<K,? extends Writable> rr : kids) {
    ret = Math.min(ret, rr.getProgress());
  }
  return ret;
}