public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl, boolean addDependencyJars, Class<? extends OutputFormat> outputFormatClass) throws IOException { Configuration conf = job.getConfiguration(); HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); job.setOutputFormatClass(outputFormatClass); if (reducer != null) job.setReducerClass(reducer); conf.set(TableOutputFormat.OUTPUT_TABLE, table); // If passed a quorum/ensemble address, pass it on to TableOutputFormat. if (quorumAddress != null) { // Calling this will validate the format ZKUtil.transformClusterKey(quorumAddress); conf.set(TableOutputFormat.QUORUM_ADDRESS, quorumAddress); } if (serverClass != null && serverImpl != null) { conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass); conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl); } job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Writable.class); if (partitioner == HRegionPartitioner.class) { job.setPartitionerClass(HRegionPartitioner.class); HTable outputTable = new HTable(conf, table); int regions = outputTable.getRegionsInfo().size(); if (job.getNumReduceTasks() > regions) { job.setNumReduceTasks(outputTable.getRegionsInfo().size()); } } else if (partitioner != null) { job.setPartitionerClass(partitioner); } if (addDependencyJars) { addDependencyJars(job); } TableMapReduceUtil.initCredentials(job); }
/** * Use this before submitting a TableReduce job. It will * appropriately set up the JobConf. * * @param table The output table. * @param reducer The reducer class to use. * @param job The current job to adjust. Make sure the passed job is * carrying all necessary HBase configuration. * @param partitioner Partitioner to use. Pass <code>null</code> to use * default partitioner. * @param quorumAddress Distant cluster to write to; default is null for * output to the cluster that is designated in <code>hbase-site.xml</code>. * Set this String to the zookeeper ensemble of an alternate remote cluster * when you would have the reduce write a cluster that is other than the * default; e.g. copying tables between clusters, the source would be * designated by <code>hbase-site.xml</code> and this param would have the * ensemble address of the remote cluster. The format to pass is particular. * Pass <code> <hbase.zookeeper.quorum>:<hbase.zookeeper.client.port>:<zookeeper.znode.parent> * </code> such as <code>server,server2,server3:2181:/hbase</code>. * @param serverClass redefined hbase.regionserver.class * @param serverImpl redefined hbase.regionserver.impl * @param addDependencyJars upload HBase jars and jars for any of the configured * job classes via the distributed cache (tmpjars). * @throws IOException When determining the region count fails. */ public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl, boolean addDependencyJars) throws IOException { Configuration conf = job.getConfiguration(); HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); job.setOutputFormatClass(TableOutputFormat.class); if (reducer != null) job.setReducerClass(reducer); conf.set(TableOutputFormat.OUTPUT_TABLE, table); // If passed a quorum/ensemble address, pass it on to TableOutputFormat. if (quorumAddress != null) { // Calling this will validate the format ZKUtil.transformClusterKey(quorumAddress); conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress); } if (serverClass != null && serverImpl != null) { conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass); conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl); } job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Writable.class); if (partitioner == HRegionPartitioner.class) { job.setPartitionerClass(HRegionPartitioner.class); HTable outputTable = new HTable(conf, table); int regions = outputTable.getRegionsInfo().size(); if (job.getNumReduceTasks() > regions) { job.setNumReduceTasks(outputTable.getRegionsInfo().size()); } } else if (partitioner != null) { job.setPartitionerClass(partitioner); } if (addDependencyJars) { addDependencyJars(job); } initCredentials(job); }
/** * Use this before submitting a TableReduce job. It will * appropriately set up the JobConf. * * @param table The output Splice table name, The format should be Schema.tableName. * @param reducer The reducer class to use. * @param job The current job to adjust. Make sure the passed job is * carrying all necessary configuration. * @param partitioner Partitioner to use. Pass <code>null</code> to use * default partitioner. * @param quorumAddress Distant cluster to write to; default is null for * output to the cluster that is designated in <code>hbase-site.xml</code>. * Set this String to the zookeeper ensemble of an alternate remote cluster * when you would have the reduce write a cluster that is other than the * default; e.g. copying tables between clusters, the source would be * designated by <code>hbase-site.xml</code> and this param would have the * ensemble address of the remote cluster. The format to pass is particular. * Pass <code> <hbase.zookeeper.quorum>:<hbase.zookeeper.client.port>:<zookeeper.znode.parent> * </code> such as <code>server,server2,server3:2181:/hbase</code>. * @param serverClass redefined hbase.regionserver.class * @param serverImpl redefined hbase.regionserver.client * @param addDependencyJars upload HBase jars and jars for any of the configured * job classes via the distributed cache (tmpjars). * @throws IOException When determining the region count fails. * @throws SQLException */ public static void initTableReducerJob(String table, Class<? extends Reducer> reducer,Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl,boolean addDependencyJars,Class<? extends OutputFormat> outputformatClass) throws IOException{ Configuration conf=job.getConfiguration(); job.setOutputFormatClass(outputformatClass); if(reducer!=null) job.setReducerClass(reducer); conf.set(MRConstants.SPLICE_OUTPUT_TABLE_NAME,table); if(sqlUtil==null) sqlUtil=SMSQLUtil.getInstance(conf.get(MRConstants.SPLICE_JDBC_STR)); // If passed a quorum/ensemble address, pass it on to TableOutputFormat. String hbaseTableID=null; try{ hbaseTableID=sqlUtil.getConglomID(table); }catch(SQLException e){ // TODO Auto-generated catch block e.printStackTrace(); throw new IOException(e); } conf.set(MRConstants.HBASE_OUTPUT_TABLE_NAME,table); if(quorumAddress!=null){ // Calling this will validate the format HBasePlatformUtils.validateClusterKey(quorumAddress); conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress); } if(serverClass!=null && serverImpl!=null){ conf.set(TableOutputFormat.REGION_SERVER_CLASS,serverClass); conf.set(TableOutputFormat.REGION_SERVER_IMPL,serverImpl); } job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Object.class); if(partitioner==HRegionPartitioner.class){ job.setPartitionerClass(HRegionPartitioner.class); // TODO Where are the keys? int regions=getReduceNumberOfRegions(hbaseTableID); if(job.getNumReduceTasks()>regions){ job.setNumReduceTasks(regions); } }else if(partitioner!=null){ job.setPartitionerClass(partitioner); } if(addDependencyJars){ addDependencyJars(job); } //initCredentials(job); }