@Override protected void jobSetup(Job job) throws IOException, ImportException { super.jobSetup(job); // we shouldn't have gotten here if bulk load dir is not set // so let's throw a ImportException if(getContext().getDestination() == null){ throw new ImportException("Can't run HBaseBulkImportJob without a " + "valid destination directory."); } TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Preconditions.class); FileOutputFormat.setOutputPath(job, getContext().getDestination()); HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable()); HFileOutputFormat.configureIncrementalLoad(job, hTable); }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("hbase.table.name", args[2]); Job job = new Job(conf, "createipas"); job.setJarByClass(CreateIpAS.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setMapperClass(Map.class); job.setReducerClass(Reducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat.class); HTable hTable = new HTable(conf, args[2]); HFileOutputFormat.configureIncrementalLoad(job, hTable); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }
@Override public void open(int taskNumber, int numTasks) throws IOException { this.hadoopConfig = getHadoopConfig(this.config); /** * PLASE NOTE: * If you are a Eclipse+Maven Integration user and you have two (or more) warnings here, please * close the pact-hbase project OR set the maven profile to hadoop_yarn * * pact-hbase requires hadoop_yarn, but Eclipse is not able to parse maven profiles properly. Therefore, * it imports the pact-hbase project even if it is not included in the standard profile (hadoop_v1) */ final TaskAttemptID attemptId = new TaskAttemptID(this.jtID, this.jobId, TaskType.MAP, taskNumber - 1, 0); this.context = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(this.hadoopConfig, attemptId); final HFileOutputFormat outFormat = new HFileOutputFormat(); try { this.writer = outFormat.getRecordWriter(this.context); } catch (InterruptedException iex) { throw new IOException("Opening the writer was interrupted.", iex); } }
@SuppressWarnings("rawtypes") private Job startBulkLoad(Configuration conf, String inputTable, String tableName, Class<? extends TableMapper> clazz, Path outputDir) throws Exception { // Create our job to bulk load into HBase Job job = Job.getInstance(conf, "HBase Bulk Loader"); job.setJarByClass(getClass()); // Initialize our mapper by specifying the input table TableMapReduceUtil.initTableMapperJob(inputTable, new Scan(), clazz, ImmutableBytesWritable.class, KeyValue.class, job); HFileOutputFormat.configureIncrementalLoad(job, new HTable(conf, tableName)); HFileOutputFormat.setOutputPath(job, outputDir); // launch the job job.waitForCompletion(true); return job; }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("hbase.table.name", TABLE_NAME); Job job = new Job(conf); job.setJarByClass(Q2Loader.class); /* set mapper and reducer keys and values */ job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat.class); /* set the output of the job to be in HFile format */ HTable hTable = new HTable(conf, TABLE_NAME); HFileOutputFormat.configureIncrementalLoad(job, hTable); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("hbase.table.name", TABLE_NAME); Job job = new Job(conf); job.setJarByClass(Q3Loader.class); // set mapper and reducer keys and values job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat.class); // set the output of the job to be in HFile format HTable hTable = new HTable(conf, TABLE_NAME); HFileOutputFormat.configureIncrementalLoad(job, hTable); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("hbase.table.name", TABLE_NAME); Job job = new Job(conf); job.setJarByClass(Q4Loader.class); /* set mapper and reducer keys and values */ job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat.class); /* set the output of the job to be in HFile format */ HTable hTable = new HTable(conf, TABLE_NAME); HFileOutputFormat.configureIncrementalLoad(job, hTable); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }
public static void main(String[] args) throws Exception { JobContext conf = (JobContext) new Configuration(); // conf.set("hbase.table.name", TABLE_NAME); Job job = new Job(conf); job.setJarByClass(HBaseLoader.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat.class); HTable hTable = new HTable(conf, TABLE_NAME); HFileOutputFormat.configureIncrementalLoad(job, hTable); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }
public int run(String[] args) throws Exception { Options options = new Options(); try { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_II_NAME); options.addOption(OPTION_INPUT_PATH); options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_HTABLE_NAME); parseOptions(options, args); Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); setJobClasspath(job); addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); FileOutputFormat.setOutputPath(job, output); job.setInputFormatClass(SequenceFileInputFormat.class); job.setMapperClass(IICreateHFileMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); String tableName = getOptionValue(OPTION_HTABLE_NAME); HTable htable = new HTable(HBaseConfiguration.create(getConf()), tableName); HFileOutputFormat.configureIncrementalLoad(job, htable); this.deletePath(job.getConfiguration(), output); return waitForCompletion(job); } catch (Exception e) { printUsage(options); throw e; } }
@Override protected void preJobLaunch(CommandLine cmd, Job job) throws Exception { job.setJobName("Twitter HBase Bulk Load"); htable = new HTable(getConf(), cmd.getOptionValue(HTABLE_OPT)); HFileOutputFormat.configureIncrementalLoad(job, htable); HFileOutputFormat.setOutputPath(job, outputDir); }
/** * Sets up the actual job. * * @param conf * The current configuration. * @param args * The command line parameters. * @return The newly created job. * @throws IOException * When setting up the job fails. */ public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException { String tableName = args[0]; Path inputDir = new Path(args[1]); Job job = new Job(conf, "HBaseToHFileMapReduce"); job.setJarByClass(HBaseToHFileMapReduce.class); FileInputFormat.setInputPaths(job, inputDir); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(HourlyImporter.class); if (args.length < 3) { // ++++ insert into table directly using TableOutputFormat ++++ TableMapReduceUtil.initTableReducerJob(tableName, null, job); job.setNumReduceTasks(0); } else { // ++++ to generate HFile instead ++++ HTable table = new HTable(conf, tableName); job.setReducerClass(PutSortReducer.class); Path outputDir = new Path(args[2]); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); HFileOutputFormat.configureIncrementalLoad(job, table); } TableMapReduceUtil.addDependencyJars(job); return job; }
public static void configureIncrementalLoad(Job job, HTable table) throws IOException { HFileOutputFormat.configureIncrementalLoad(job, table); // Override OutputFormatClass job.setOutputFormatClass(IndexHFileOutputFormat.class); }
public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Path inputDir = new Path(args[0]); Path outputDir = new Path(args[1]); boolean createPartitionFile = Boolean.parseBoolean(args[2]); Job job = Job.getInstance(conf, "Import delicious RSS feed into Hush tables."); job.setJarByClass(BulkImportJobExample.class); job.setInputFormatClass(TextInputFormat.class); // conf.setLong("hbase.hregion.max.filesize", 64 * 1024); FileInputFormat.setInputPaths(job, inputDir); job.setMapperClass(BulkImportMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); job.setPartitionerClass(TotalOrderPartitioner.class); job.setReducerClass(PutSortReducer.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat.setOutputPath(job, outputDir); HFileOutputFormat.setCompressOutput(job, true); HFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); job.getConfiguration().set("hfile.compression", "gz"); //job.getConfiguration().setFloat("mapred.job.shuffle.input.buffer.percent", 0.5f); //job.setNumReduceTasks(30); Path partitionsPath = new Path(job.getWorkingDirectory(), "partitions_" + System.currentTimeMillis()); TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath); if (createPartitionFile) { VerboseInputSampler.Sampler<KeyValue, ImmutableBytesWritable> sampler = new VerboseInputSampler.VerboseRandomSampler<KeyValue, ImmutableBytesWritable>(0.05, 1000000, 30); // use 0.1 for real sampling LOG.info("Sampling key space"); VerboseInputSampler.writePartitionFile(job, sampler); LOG.info("Samping done"); } URI cacheUri = new URI(partitionsPath.toString() + "#" + TotalOrderPartitioner.DEFAULT_PATH); DistributedCache.addCacheFile(cacheUri, job.getConfiguration()); DistributedCache.createSymlink(job.getConfiguration()); return job; }
/** * Sets up the actual job. * * @param conf * The current configuration. * @param args * The command line parameters. * @return The newly created job. * @throws IOException * When setting up the job fails. */ public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException, ClassNotFoundException { // Support non-XML supported characters // by re-encoding the passed separator as a Base64 string. String actualSeparator = conf.get(SEPARATOR_CONF_KEY); if (actualSeparator != null) { conf.set(SEPARATOR_CONF_KEY, new String(Base64.encodeBytes(actualSeparator.getBytes()))); } // See if a non-default Mapper was set String mapperClassName = conf.get(MAPPER_CONF_KEY); Class mapperClass = mapperClassName != null ? Class .forName(mapperClassName) : DEFAULT_MAPPER; String tableName = args[0]; Path inputDir = new Path(args[1]); Job job = new Job(conf, NAME + "_" + tableName); job.setJarByClass(mapperClass); FileInputFormat.setInputPaths(job, inputDir); String inputCodec = conf.get(INPUT_LZO_KEY); if (inputCodec == null) { FileInputFormat.setMaxInputSplitSize(job, 67108864l); // max split // size = // 64m job.setInputFormatClass(TextInputFormat.class); } else { if (inputCodec.equalsIgnoreCase("lzo")) job.setInputFormatClass(LzoTextInputFormat.class); else { usage("not supported compression codec!"); System.exit(-1); } } job.setMapperClass(mapperClass); String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); if (hfileOutPath != null) { HTable table = new HTable(conf, tableName); job.setReducerClass(PutSortReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); HFileOutputFormat.configureIncrementalLoad(job, table); } else { // No reducers. Just write straight to table. Call // initTableReducerJob // to set up the TableOutputFormat. TableMapReduceUtil.initTableReducerJob(tableName, null, job); job.setNumReduceTasks(0); } TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJars(job.getConfiguration(), com.google.common.base.Function.class /* * Guava used by TsvParser */); return job; }
public int run(String[] args) throws Exception { Options options = new Options(); try { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_CUBE_NAME); options.addOption(OPTION_INPUT_PATH); options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_HTABLE_NAME); parseOptions(options, args); Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = cubeMgr.getCube(cubeName); job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); setJobClasspath(job); addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); FileOutputFormat.setOutputPath(job, output); job.setInputFormatClass(SequenceFileInputFormat.class); job.setMapperClass(CubeHFileMapper.class); job.setReducerClass(KeyValueSortReducer.class); // set job configuration job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); Configuration conf = HBaseConfiguration.create(getConf()); // add metadata to distributed cache attachKylinPropsAndMetadata(cube, job.getConfiguration()); String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase(); HTable htable = new HTable(conf, tableName); //Automatic config ! HFileOutputFormat.configureIncrementalLoad(job, htable); // set block replication to 3 for hfiles conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3"); this.deletePath(job.getConfiguration(), output); return waitForCompletion(job); } catch (Exception e) { logger.error("error in CubeHFileJob", e); printUsage(options); throw e; } }
private static void bulkLoadNewMacAddresses( Configuration conf, String inputPath, String outputPath, String tblName) throws Exception { // Pass parameters to Mad Reduce conf.set("hbase.table.name", tblName); conf.set("macs", macAddressesLine); // Workaround SchemaMetrics.configureGlobally(conf); // Load hbase-site.xml HBaseConfiguration.addHbaseResources(conf); // Create the job Job job = new Job(conf, "Load macAddresses in bloomfilters table"); job.setJarByClass(MapperBulkLoadMacAddresses.class); job.setMapperClass(MapperBulkLoadMacAddresses.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); job.setInputFormatClass(TextInputFormat.class); // Get the table HTable hTable = new HTable(conf, tblName); // Auto configure partitioner and reducer HFileOutputFormat.configureIncrementalLoad(job, hTable); // Save output path and input path FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); // Wait for HFiles creations job.waitForCompletion(true); // Load generated HFiles into table LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); loader.doBulkLoad(new Path(outputPath), hTable); }
/** * Load radiomap in HBase * * @param conf * @throws Exception */ private static void bulkLoadVectormap( Configuration conf, String inputPath, String outputPath, String tblName) throws Exception { // Pass parameters to Mad Reduce conf.set("hbase.table.name", tblName); conf.set("macs", macAddressesLine); // Workaround SchemaMetrics.configureGlobally(conf); // Load hbase-site.xml HBaseConfiguration.addHbaseResources(conf); // Create the job Job job = new Job(conf, "Load radiomap in HBase"); job.setJarByClass(MapperBulkLoadRadiomap.class); job.setMapperClass(MapperBulkLoadRadiomap.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); job.setInputFormatClass(TextInputFormat.class); // Get the table HTable hTable = new HTable(conf, tblName); // Auto configure partitioner and reducer HFileOutputFormat.configureIncrementalLoad(job, hTable); // Save output path and input path FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); // Wait for HFiles creations job.waitForCompletion(true); // Load generated HFiles into table LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); loader.doBulkLoad(new Path(outputPath), hTable); }
public static void main(String[] args) throws Exception { if (args.length == 0) { System.out.println("PopulateSmallTable {numberOfMappers} {numberOfRecords} {tmpOutputPath} {tableName} {columnFamily} {runID}"); return; } String numberOfMappers = args[0]; String numberOfRecords = args[1]; String outputPath = args[2]; String tableName = args[3]; String columnFamily = args[4]; String runID = args[5]; // Create job Job job = Job.getInstance(); HBaseConfiguration.addHbaseResources(job.getConfiguration()); job.setJarByClass(PopulateTable.class); job.setJobName("PopulateTable: " + runID); job.getConfiguration().set(NUMBER_OF_RECORDS, numberOfRecords); job.getConfiguration().set(TABLE_NAME, tableName); job.getConfiguration().set(COLUMN_FAMILY, columnFamily); job.getConfiguration().set(RUN_ID, runID); // Define input format and path job.setInputFormatClass(NMapInputFormat.class); NMapInputFormat.setNumMapTasks(job.getConfiguration(), Integer.parseInt(numberOfMappers)); Configuration config = HBaseConfiguration.create(); HTable hTable = new HTable(config, tableName); // Auto configure partitioner and reducer HFileOutputFormat.configureIncrementalLoad(job, hTable); FileOutputFormat.setOutputPath(job, new Path(outputPath)); // Define the mapper and reducer job.setMapperClass(CustomMapper.class); // job.setReducerClass(CustomReducer.class); // Define the key and value format job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); // Exit job.waitForCompletion(true); FileSystem hdfs = FileSystem.get(config); // Must all HBase to have write access to HFiles HFileUtils.changePermissionR(outputPath, hdfs); LoadIncrementalHFiles load = new LoadIncrementalHFiles(config); load.doBulkLoad(new Path(outputPath), hTable); }
public static void main(String[] args) throws Exception { if (args.length == 0) { System.out.println("PopulateSmallTable {numberOfMappers} {numberOfRecords} {tmpOutputPath} {smallTableName} {columnFamily} {runID}"); return; } String numberOfMappers = args[0]; String numberOfRecords = args[1]; String outputPath = args[2]; String tableName = args[3]; String columnFamily = args[4]; String runID = args[5]; // Create job Job job = Job.getInstance(); HBaseConfiguration.addHbaseResources(job.getConfiguration()); job.setJarByClass(PopulateSmallTableMain.class); job.setJobName("PopulateSmallTableMain: " + runID); job.getConfiguration().set(NUMBER_OF_RECORDS, numberOfRecords); job.getConfiguration().set(TABLE_NAME, tableName); job.getConfiguration().set(COLUMN_FAMILY, columnFamily); job.getConfiguration().set(RUN_ID, runID); // Define input format and path job.setInputFormatClass(NMapInputFormat.class); NMapInputFormat.setNumMapTasks(job.getConfiguration(), Integer.parseInt(numberOfMappers)); Configuration config = HBaseConfiguration.create(); HTable hTable = new HTable(config, tableName); // Auto configure partitioner and reducer HFileOutputFormat.configureIncrementalLoad(job, hTable); FileOutputFormat.setOutputPath(job, new Path(outputPath)); // Define the mapper and reducer job.setMapperClass(CustomMapper.class); // job.setReducerClass(CustomReducer.class); // Define the key and value format job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); // Exit job.waitForCompletion(true); FileSystem hdfs = FileSystem.get(config); // Must all HBase to have write access to HFiles HFileUtils.changePermissionR(outputPath, hdfs); LoadIncrementalHFiles load = new LoadIncrementalHFiles(config); load.doBulkLoad(new Path(outputPath), hTable); }
public static void main(String[] args) throws Exception { if (args.length == 0) { System.out.println("ExampleHBaseBulkLoad {inputPath} {outputPath} {tableName} {columnFamily} {runID}"); return; } String inputPath = args[0]; String outputPath = args[1]; String tableName = args[2]; String columnFamily = args[3]; String runID = args[4]; // Create job Job job = Job.getInstance(); job.setJarByClass(ExampleHBaseBulkLoad.class); job.setJobName("ExampleHBaseBulkLoad: " + runID); job.getConfiguration().set(TABLE_NAME, tableName); job.getConfiguration().set(COLUMN_FAMILY, columnFamily); job.getConfiguration().set(RUN_ID, runID); // Define input format and path job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path(inputPath)); Configuration config = HBaseConfiguration.create(); HTable hTable = new HTable(config, tableName); // Auto configure partitioner and reducer HFileOutputFormat.configureIncrementalLoad(job, hTable); FileOutputFormat.setOutputPath(job, new Path(outputPath)); // Define the mapper and reducer job.setMapperClass(CustomMapper.class); // job.setReducerClass(CustomReducer.class); // Define the key and value format job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); // Exit job.waitForCompletion(true); FileSystem hdfs = FileSystem.get(config); // Must all HBase to have write access to HFiles HFileUtils.changePermissionR(outputPath, hdfs); LoadIncrementalHFiles load = new LoadIncrementalHFiles(config); load.doBulkLoad(new Path(outputPath), hTable); }
private void configureBulkImport() throws IOException { HTable table = new HTable(getConfiguration(), _tableName); HFileOutputFormat.configureIncrementalLoad(_job, table); HFileOutputFormat.setOutputPath(_job, _hfilePath); }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("hbase.table.name", TABLE_NAME); Job job = new Job(conf, "loadipdns"); job.setJarByClass(LoadIPDns.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setMapperClass(Map.class); job.setReducerClass(Reducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat.class); HTable hTable = new HTable(conf, TABLE_NAME); HFileOutputFormat.configureIncrementalLoad(job, hTable); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //job.setPartitionerClass(TotalOrderPartitioner.class); //job.setNumReduceTasks(113); //Path input = new Path(args[2]); //input = input.makeQualified(input.getFileSystem(conf)); //Path partitionFile = new Path(input, "part-r-00000"); //System.out.println("Partition file path:" + partitionFile); //TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFile); //URI partitionUri = new URI(partitionFile.toString() + "#_partitions"); //DistributedCache.addCacheFile(partitionUri, job.getConfiguration()); //DistributedCache.createSymlink(job.getConfiguration()); job.waitForCompletion(true); //LoadIPDns.loadHFiles(); }