void testInputFormat(Class<? extends InputFormat> clazz) throws IOException { final JobConf job = MapreduceTestingShim.getJobConf(mrCluster); job.setInputFormat(clazz); job.setOutputFormat(NullOutputFormat.class); job.setMapperClass(ExampleVerifier.class); job.setNumReduceTasks(0); LOG.debug("submitting job."); final RunningJob run = JobClient.runJob(job); assertTrue("job failed!", run.isSuccessful()); assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, run.getCounters() .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getCounter()); assertEquals("Saw any instances of the filtered out row.", 0, run.getCounters() .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getCounter()); assertEquals("Saw the wrong number of instances of columnA.", 1, run.getCounters() .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getCounter()); assertEquals("Saw the wrong number of instances of columnB.", 1, run.getCounters() .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getCounter()); assertEquals("Saw the wrong count of values for the filtered-for row.", 2, run.getCounters() .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getCounter()); assertEquals("Saw the wrong count of values for the filtered-out row.", 0, run.getCounters() .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getCounter()); }
/** This is used before starting HDFS and map-reduce mini-clusters */ private void createDirsAndSetProperties() throws IOException { setupClusterTestDir(); System.setProperty(TEST_DIRECTORY_KEY, clusterTestDir.getPath()); createDirAndSetProperty("cache_data", "test.cache.data"); createDirAndSetProperty("hadoop_tmp", "hadoop.tmp.dir"); hadoopLogDir = createDirAndSetProperty("hadoop_logs", "hadoop.log.dir"); createDirAndSetProperty("mapred_local", "mapreduce.cluster.local.dir"); createDirAndSetProperty("mapred_temp", "mapreduce.cluster.temp.dir"); enableShortCircuit(); Path root = getDataTestDirOnTestFS("hadoop"); conf.set(MapreduceTestingShim.getMROutputDirProp(), new Path(root, "mapred-output-dir").toString()); conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString()); conf.set("mapreduce.jobtracker.staging.root.dir", new Path(root, "mapreduce-jobtracker-staging-root-dir").toString()); conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString()); }
/** This is used before starting HDFS and map-reduce mini-clusters */ private void createDirsAndSetProperties() throws IOException { setupClusterTestDir(); System.setProperty(TEST_DIRECTORY_KEY, clusterTestDir.getPath()); createDirAndSetProperty("cache_data", "test.cache.data"); createDirAndSetProperty("hadoop_tmp", "hadoop.tmp.dir"); hadoopLogDir = createDirAndSetProperty("hadoop_logs", "hadoop.log.dir"); createDirAndSetProperty("mapred_local", "mapred.local.dir"); createDirAndSetProperty("mapred_temp", "mapred.temp.dir"); enableShortCircuit(); Path root = getDataTestDirOnTestFS("hadoop"); conf.set(MapreduceTestingShim.getMROutputDirProp(), new Path(root, "mapred-output-dir").toString()); conf.set("mapred.system.dir", new Path(root, "mapred-system-dir").toString()); conf.set("mapreduce.jobtracker.staging.root.dir", new Path(root, "mapreduce-jobtracker-staging-root-dir").toString()); conf.set("mapred.working.dir", new Path(root, "mapred-working-dir").toString()); }
/** This is used before starting HDFS and map-reduce mini-clusters */ private void createDirsAndSetProperties() throws IOException { setupClusterTestDir(); conf.set(TEST_DIRECTORY_KEY, clusterTestDir.getPath()); System.setProperty(TEST_DIRECTORY_KEY, clusterTestDir.getPath()); createDirAndSetProperty("cache_data", "test.cache.data"); createDirAndSetProperty("hadoop_tmp", "hadoop.tmp.dir"); hadoopLogDir = createDirAndSetProperty("hadoop_logs", "hadoop.log.dir"); createDirAndSetProperty("mapred_local", "mapreduce.cluster.local.dir"); createDirAndSetProperty("mapred_temp", "mapreduce.cluster.temp.dir"); enableShortCircuit(); Path root = getDataTestDirOnTestFS("hadoop"); conf.set(MapreduceTestingShim.getMROutputDirProp(), new Path(root, "mapred-output-dir").toString()); conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString()); conf.set("mapreduce.jobtracker.staging.root.dir", new Path(root, "mapreduce-jobtracker-staging-root-dir").toString()); conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString()); conf.set("yarn.app.mapreduce.am.staging-dir", new Path(root, "mapreduce-am-staging-root-dir").toString()); }
/** * Starts a <code>MiniMRCluster</code>. * * @param servers The number of <code>TaskTracker</code>'s to start. * @throws IOException When starting the cluster fails. */ public void startMiniMapReduceCluster(final int servers) throws IOException { LOG.info("Starting mini mapreduce cluster..."); if (dataTestDir == null) { setupDataTestDir(); } // These are needed for the new and improved Map/Reduce framework Configuration c = getConfiguration(); String logDir = c.get("hadoop.log.dir"); String tmpDir = c.get("hadoop.tmp.dir"); if (logDir == null) { logDir = tmpDir; } System.setProperty("hadoop.log.dir", logDir); c.set("mapred.output.dir", tmpDir); // Tests were failing because this process used 6GB of virtual memory and was getting killed. // we up the VM usable so that processes don't get killed. conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f); mrCluster = new MiniMRCluster(0, 0, servers, FileSystem.get(conf).getUri().toString(), 1, null, null, null, new JobConf(conf)); JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster); if (jobConf == null) { jobConf = mrCluster.createJobConf(); } jobConf.set("mapred.local.dir", conf.get("mapred.local.dir")); //Hadoop MiniMR overwrites this while it should not LOG.info("Mini mapreduce cluster started"); JobConf mrClusterJobConf = mrCluster.createJobConf(); c.set("mapred.job.tracker", mrClusterJobConf.get("mapred.job.tracker")); /* this for mrv2 support */ conf.set("mapreduce.framework.name", "yarn"); String rmAdress = mrClusterJobConf.get("yarn.resourcemanager.address"); if (rmAdress != null) { conf.set("yarn.resourcemanager.address", rmAdress); } String schedulerAdress = mrClusterJobConf.get("yarn.resourcemanager.scheduler.address"); if (schedulerAdress != null) { conf.set("yarn.resourcemanager.scheduler.address", schedulerAdress); } }