Java 类org.apache.hadoop.mapred.lib.MultipleOutputs 实例源码

项目:systemml    文件:MRJobConfiguration.java   
public static CollectMultipleConvertedOutputs getMultipleConvertedOutputs(JobConf job)
{       
    byte[] resultIndexes=MRJobConfiguration.getResultIndexes(job);
    Converter[] outputConverters=new Converter[resultIndexes.length];
    MatrixCharacteristics[] stats=new MatrixCharacteristics[resultIndexes.length];
    HashMap<Byte, ArrayList<Integer>> tagMapping=new HashMap<>();
    for(int i=0; i<resultIndexes.length; i++)
    {
        byte output=resultIndexes[i];
        ArrayList<Integer> vec=tagMapping.get(output);
        if(vec==null)
        {
            vec=new ArrayList<>();
            tagMapping.put(output, vec);
        }
        vec.add(i);

        outputConverters[i]=getOuputConverter(job, i);
        stats[i]=MRJobConfiguration.getMatrixCharacteristicsForOutput(job, output);
    }

    MultipleOutputs multipleOutputs=new MultipleOutputs(job);

    return new CollectMultipleConvertedOutputs(outputConverters, stats, multipleOutputs);

}
项目:hadoop-consumer    文件:KafkaETLJob.java   
/**
 * Create a job configuration
 */
@SuppressWarnings("rawtypes")
public static JobConf createJobConf(String name, String topic, Props props, Class classobj) 
throws Exception {
    JobConf conf = getJobConf(name, props, classobj);

    conf.set("topic", topic);

    // input format
    conf.setInputFormat(KafkaETLInputFormat.class);

    //turn off mapper speculative execution
    conf.setMapSpeculativeExecution(false);

    // setup multiple outputs
    MultipleOutputs.addMultiNamedOutput(conf, "offsets", SequenceFileOutputFormat.class, 
                KafkaETLKey.class, BytesWritable.class);


    return conf;
}
项目:Acacia    文件:EdgeDistributor.java   
@Override
public void configure(JobConf conf) {
    multipleOutputs = new MultipleOutputs(conf);
    currentTable = conf.get("org.acacia.partitioner.hbase.table");
    zookeeperhost = getZookeeperLocation();
    contactHost = conf.get("org.acacia.partitioner.index.contacthost");
    totalVertexCount = Long.parseLong(conf.get("vert-count"));
    initalPartitionID = Integer.parseInt(conf.get("initpartition-id"));
    zeroFlag = Boolean.parseBoolean(conf.get("zero-flag"));
    loadIndex();
}
项目:Acacia    文件:EdgelistPartitioner.java   
@SuppressWarnings("unused")
public static void main(String[] args) throws IOException {
    JobConf conf = new JobConf(EdgelistPartitioner.class);

    if (conf == null) {
        return;
    }
    String dir1 = "/user/miyuru/merged";
    String dir2 = "/user/miyuru/merged-out";

    // We first delete the temporary directories if they exist on the HDFS
    FileSystem fs1 = FileSystem.get(new JobConf());
    // only delete dir2 because dir1 is uploaded externally.
    if (fs1.exists(new Path(dir2))) {
        fs1.delete(new Path(dir2), true);
    }

    conf.setInputFormat(WholeFileInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);

    WholeFileInputFormat.setInputPaths(conf, new Path(dir1));
    SequenceFileOutputFormat.setOutputPath(conf, new Path(dir2));

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);

    conf.setMapperClass(SequenceFileMapper.class);
    conf.setReducerClass(MultipleOutputsInvertedReducer.class);
    conf.setOutputFormat(NullOutputFormat.class);

    conf.setJobName("EdgelistPartitioner");

    MultipleOutputs.addMultiNamedOutput(conf, "partition",
            TextOutputFormat.class, NullWritable.class, Text.class);

    JobClient.runJob(conf);
}
项目:systemml    文件:MRJobConfiguration.java   
public static void setUpMultipleOutputs(JobConf job, byte[] resultIndexes, byte[] resultDimsUnknown, String[] outputs, 
        OutputInfo[] outputInfos, boolean inBlockRepresentation, boolean mayContainCtable) 
throws Exception
{
    if(resultIndexes.length!=outputs.length)
        throw new Exception("number of outputs and result indexes does not match");
    if(outputs.length!=outputInfos.length)
        throw new Exception("number of outputs and outputInfos indexes does not match");

    job.set(RESULT_INDEXES_CONFIG, MRJobConfiguration.getIndexesString(resultIndexes));
    job.set(RESULT_DIMS_UNKNOWN_CONFIG, MRJobConfiguration.getIndexesString(resultDimsUnknown));
    job.setStrings(OUTPUT_MATRICES_DIRS_CONFIG, outputs);
    job.setOutputCommitter(MultipleOutputCommitter.class);

    for(int i=0; i<outputs.length; i++)
    {
        MapReduceTool.deleteFileIfExistOnHDFS(new Path(outputs[i]), job);
        if ( mayContainCtable && resultDimsUnknown[i] == (byte) 1 )  {
            setOutputInfo(job, i, outputInfos[i], false);
        }
        else {
            setOutputInfo(job, i, outputInfos[i], inBlockRepresentation);
        }
        MultipleOutputs.addNamedOutput(job, Integer.toString(i), 
                outputInfos[i].outputFormatClass, outputInfos[i].outputKeyClass, 
                outputInfos[i].outputValueClass);
    }
    job.setOutputFormat(NullOutputFormat.class);

    // configure temp output
    Path tempOutputPath = new Path( constructTempOutputFilename() );
    FileOutputFormat.setOutputPath(job, tempOutputPath);
    MapReduceTool.deleteFileIfExistOnHDFS(tempOutputPath, job);
}
项目:systemml    文件:CollectMultipleConvertedOutputs.java   
public CollectMultipleConvertedOutputs(Converter[] convts, MatrixCharacteristics[] stats, 
        MultipleOutputs outputs)
{
    outputConverters=convts;
    multipleOutputs=outputs;
    matrixStats=stats;
}
项目:hadoop-consumer    文件:KafkaETLContext.java   
/**
 * construct using input string
 */
@SuppressWarnings("unchecked")
public KafkaETLContext(JobConf job, Props props, Reporter reporter, 
                                MultipleOutputs mos, int index, String input) 
throws Exception {

    _bufferSize = getClientBufferSize(props);
    _timeout = getClientTimeout(props);
    System.out.println("bufferSize=" +_bufferSize);
    System.out.println("timeout=" + _timeout);
    _reporter = reporter;
    _mos = mos;

    // read topic and current offset from input
    _index= index; 
    _input = input;
    _request = new KafkaETLRequest(input.trim());

    // read data from queue
    URI uri = _request.getURI();
    _consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), _timeout, _bufferSize);

    // get available offset range
    _offsetRange = getOffsetRange();
    System.out.println("Connected to node " + uri 
            + " beginning reading at offset " + _offsetRange[0]
            + " latest offset=" + _offsetRange[1]);

    _offset = _offsetRange[0];
    _count = 0;
    _requestTime = 0;
    _retry = 0;

    _startTime = System.currentTimeMillis();
}
项目:hadoop-consumer    文件:KafkaETLRecordReader.java   
public KafkaETLRecordReader(InputSplit split, JobConf job, Reporter reporter) 
throws IOException {
   super(job, (FileSplit) split);

   _props = KafkaETLUtils.getPropsFromJob(job);
   _contextList = new ArrayList<KafkaETLContext>();
   _job = job;
   _reporter = reporter;
   _contextIndex = -1;
   _mos = new MultipleOutputs(job);
   try {
       _limit = _props.getInt("kafka.request.limit", -1);

       /*get attemp id*/
       String taskId = _job.get("mapred.task.id");
       if (taskId == null) {
           throw new IllegalArgumentException(
                             "Configutaion does not contain the property mapred.task.id");
       }
       String[] parts = taskId.split("_");
       if (    parts.length != 6 || !parts[0].equals("attempt") 
            || (!"m".equals(parts[3]) && !"r".equals(parts[3]))) {
               throw new IllegalArgumentException(
                             "TaskAttemptId string : " + taskId + " is not properly formed");
       }
      _attemptId = parts[4]+parts[3];
   }catch (Exception e) {
       throw new IOException (e);
   }
}
项目:Acacia    文件:EdgeDistributor.java   
public static void main(String[] args) throws IOException,
        InterruptedException, ClassNotFoundException {
    String dir1 = "/user/miyuru/input";
    String dir2 = "/user/miyuru/edgedistributed-out";

    // //We first delete the temporary directories if they exist on the HDFS
    FileSystem fs1 = FileSystem.get(new JobConf());
    if (fs1.exists(new Path(dir2))) {
        fs1.delete(new Path(dir2), true);
    }

    // First job scans through the edge list and splits the edges in to
    // separate files based on the partitioned vertex files.

    JobConf conf = new JobConf(EdgeDistributor.class);
    conf.set("org.acacia.partitioner.hbase.zookeeper.quorum", args[0]);
    conf.set("org.acacia.partitioner.hbase.table", args[1]);
    conf.set("org.acacia.partitioner.index.contacthost", args[2]);
    conf.set("vert-count", args[3]);
    conf.set("initpartition-id", args[4]);
    conf.set("zero-flag", args[5]);
    conf.setOutputKeyClass(LongWritable.class);
    conf.setOutputValueClass(Text.class);
    conf.setMapperClass(FileMapper.class);
    conf.setReducerClass(FileReducer.class);
    // conf.setInputFormat(TextInputFormat.class);
    conf.setInputFormat(NLinesInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);
    conf.setNumReduceTasks(96); // Need to specify the number of reduce
                                // tasks explicitly. Otherwise it creates
                                // only one reduce task.

    FileInputFormat.setInputPaths(conf, new Path(dir1));
    FileOutputFormat.setOutputPath(conf, new Path(dir2));

    MultipleOutputs.addMultiNamedOutput(conf, "partition",
            TextOutputFormat.class, NullWritable.class, Text.class);

    Job job = new Job(conf, "EdgeDistributor");
    job.waitForCompletion(true);

    System.out.println("Done job EdgeDistribution");
}
项目:Acacia    文件:EdgelistPartitioner.java   
@Override
public void configure(JobConf conf) {
    multipleOutputs = new MultipleOutputs(conf);
}
项目:Chi-FRBCS-BigDataCS    文件:IOUtils.java   
public MultipleOutputsCloseableAdapter(MultipleOutputs mo) {
  this.mo = mo;
}
项目:Chi-FRBCS-BigData-Ave    文件:IOUtils.java   
public MultipleOutputsCloseableAdapter(MultipleOutputs mo) {
  this.mo = mo;
}
项目:Chi-FRBCS-BigData-Max    文件:IOUtils.java   
public MultipleOutputsCloseableAdapter(MultipleOutputs mo) {
  this.mo = mo;
}
项目:WikipediaMiner    文件:RedirectStep.java   
@Override
public int run(String[] args) throws Exception {

    JobConf conf = new JobConf(RedirectStep.class);
    DumpExtractor.configureJob(conf, args) ;

    conf.setJobName("WM: resolve redirects");

    conf.setOutputKeyClass(IntWritable.class);
    conf.setOutputValueClass(DbIntList.class);      

    conf.setMapperClass(Step2Mapper.class);
    conf.setCombinerClass(Step2Reducer.class) ;
    conf.setReducerClass(Step2Reducer.class) ;

    // set up input

    conf.setInputFormat(TextInputFormat.class);
    FileInputFormat.setInputPaths(conf, new Path(conf.get(DumpExtractor.KEY_OUTPUT_DIR) + "/" + DumpExtractor.getDirectoryName(ExtractionStep.page) + "/" + PageStep.Output.tempRedirect + "*"));

    //set up output

    conf.setOutputFormat(RedirectOutputFormat.class);
    FileOutputFormat.setOutputPath(conf, new Path(conf.get(DumpExtractor.KEY_OUTPUT_DIR) + "/" + DumpExtractor.getDirectoryName(ExtractionStep.redirect)));

    //set up distributed cache

    DistributedCache.addCacheFile(new Path(conf.get(DumpExtractor.KEY_OUTPUT_DIR) + "/" + DumpExtractor.OUTPUT_SITEINFO).toUri(), conf);
    DistributedCache.addCacheFile(new Path(conf.get(DumpExtractor.KEY_LANG_FILE)).toUri(), conf);

    //cache page files created in previous step, so we can look up pages by title
    Path pageStepPath = new Path(conf.get(DumpExtractor.KEY_OUTPUT_DIR) + "/" + DumpExtractor.getDirectoryName(ExtractionStep.page)) ;
    for (FileStatus fs:FileSystem.get(conf).listStatus(pageStepPath)) {

        if (fs.getPath().getName().startsWith(PageStep.Output.tempPage.name())) {
            Logger.getLogger(RedirectStep.class).info("Cached page file " + fs.getPath()) ;
            DistributedCache.addCacheFile(fs.getPath().toUri(), conf);
        }
    }

    MultipleOutputs.addNamedOutput(conf, Output.redirectTargetsBySource.name(), TextOutputFormat.class,
            IntWritable.class, IntWritable.class);

    conf.set("mapred.textoutputformat.separator", ",");

    //run job
    JobClient.runJob(conf);
    return 0;
}
项目:WikipediaMiner    文件:RedirectStep.java   
@Override
public void configure(JobConf job) {

    HashSet<PageType> pageTypesToCache = new HashSet<PageType>() ;
    pageTypesToCache.add(PageType.article) ;
    pageTypesToCache.add(PageType.redirect) ;
    pageTypesToCache.add(PageType.disambiguation) ;

    try {

        Path[] cacheFiles = DistributedCache.getLocalCacheFiles(job);

        for (Path cf:cacheFiles) {

            if (cf.getName().equals(new Path(DumpExtractor.OUTPUT_SITEINFO).getName())) {
                si = new SiteInfo(cf) ;
            }

            if (cf.getName().equals(new Path(job.get(DumpExtractor.KEY_LANG_FILE)).getName())) {
                lc = new LanguageConfiguration(job.get(DumpExtractor.KEY_LANG_CODE), cf) ;
            }

            if (cf.getName().startsWith(PageStep.Output.tempPage.name())) {
                Logger.getLogger(Step2Mapper.class).info("Located cached page file " + cf.toString()) ;
                pageFiles.add(cf) ;
            }
        }

        if (si == null) 
            throw new Exception("Could not locate '" + DumpExtractor.OUTPUT_SITEINFO + "' in DistributedCache") ;

        if (lc == null) 
            throw new Exception("Could not locate '" + job.get(DumpExtractor.KEY_LANG_FILE) + "' in DistributedCache") ;

        if (pageFiles.isEmpty())
            throw new Exception("Could not gather page summary files produced in step 1") ;

        mos = new MultipleOutputs(job);

    } catch (Exception e) {
        Logger.getLogger(Step2Mapper.class).error("Could not configure mapper", e);
        System.exit(1) ;
    }
}