Java 类org.apache.hadoop.mapred.FairScheduler.JobInfo 实例源码

项目:hadoop-2.6.0-cdh5.4.3    文件:TestFairScheduler.java   
/**
 * This test exercises delay scheduling at the node level. We submit a job
 * with data on rack1.node2 and check that it doesn't get assigned on earlier
 * nodes. A second job with no locality info should get assigned instead.
 * 
 * TaskTracker names in this test map to nodes as follows:
 * - tt1 = rack1.node1
 * - tt2 = rack1.node2
 * - tt3 = rack2.node1
 * - tt4 = rack2.node2
 */
public void testDelaySchedulingAtNodeLevel() throws IOException {
  setUpCluster(2, 2, true);
  scheduler.assignMultiple = true;

  JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 0, "pool1",
      new String[][] {
        {"rack2.node2"}
      });
  JobInfo info1 = scheduler.infos.get(job1);

  // Advance time before submitting another job j2, to make j1 be ahead
  // of j2 in the queue deterministically.
  advanceTime(100);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 0);

  // Assign tasks on nodes 1-3 and check that j2 gets them
  checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1", 
                         "attempt_test_0002_m_000001_0 on tt1");
  checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2",
                         "attempt_test_0002_m_000003_0 on tt2");
  checkAssignment("tt3", "attempt_test_0002_m_000004_0 on tt3",
                         "attempt_test_0002_m_000005_0 on tt3");

  // Assign a task on node 4 now and check that j1 gets it. The other slot
  // on the node should be given to j2 because j1 will be out of tasks.
  checkAssignment("tt4", "attempt_test_0001_m_000000_0 on tt4",
                         "attempt_test_0002_m_000006_0 on tt4");

  // Check that delay scheduling info is properly set
  assertEquals(info1.lastMapLocalityLevel, LocalityLevel.NODE);
  assertEquals(info1.timeWaitedForLocalMap, 0);
  assertEquals(info1.skippedAtLastHeartbeat, false);
}
项目:hadoop-EAR    文件:TestFairScheduler.java   
/**
 * We submit two jobs at interval of 200 such that job2 has 2x the priority
 * of the job1, then wait for 100 ms, and check that all slots are assigned
 * to job 1 even though job 2 has higher priority and fair scheduler would
 * have allocated atleast a few slots to job 2
 */
public void testFifoJobScheduler() throws Exception {
   // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"pool_a\">");
  out.println("<minMaps>2</minMaps>");
  out.println("<minReduces>2</minReduces>");
  // enable fifo
  out.println("<fifo>true</fifo>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();

  JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
  JobInfo info1 = scheduler.infos.get(job1);

  // Advance time 200ms and submit job 2
  advanceTime(200);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
  JobInfo info2 = scheduler.infos.get(job2);
  job2.setPriority(JobPriority.HIGH);
  // Advance time 100ms
  advanceTime(100);

  // Assign tasks and check that all slots are given to job1
  checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
  checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
}
项目:hadoop-EAR    文件:TestFairScheduler.java   
/**
 * This test configures a pool pool_a, and redirects the default to it.
 */
public void testPoolRedirect() throws Exception {
  // Set up pools file
  // pool_a has 0 totalInitedTasks, default does not have that restriction.
  // The redirect from default -> pool_a should enforce 0 total inited tasks.
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"default\">");
  out.println("<maxTotalInitedTasks>100</maxTotalInitedTasks>");
  out.println("<redirect>pool_a</redirect>");
  out.println("</pool>");
  out.println("<pool name=\"pool_a\">");
  out.println("<maxTotalInitedTasks>0</maxTotalInitedTasks>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();

  // Submit a job.
  JobInProgress job1 = submitJobNoInitialization(JobStatus.PREP, 10, 10);
  JobInfo info1 = scheduler.infos.get(job1);
  advanceTime(10);
  Thread.sleep(1000L); // Let JobInitializaer to finish the work

  // Should have gone to pool_a, not default
  assertEquals(info1.poolName, "pool_a");
}
项目:hadoop-EAR    文件:TestFairScheduler.java   
/**
 * This test submits jobs in two pools, pool_a and pool_b. None of the
 * jobs in pool_a have maps, but this should not affect their reduce
 * share.
 */
public void testPoolWeightsWhenNoMaps() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"pool_a\">");
  out.println("<weight>2.0</weight>");
  out.println("</pool>");
  out.println("<pool name=\"pool_b\">");
  out.println("<weight>1.0</weight>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();

  // Submit jobs, advancing time in-between to make sure that they are
  // all submitted at distinct times.
  JobInProgress job1 = submitJob(JobStatus.RUNNING, 0, 10, "pool_a");
  JobInfo info1 = scheduler.infos.get(job1);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 0, 10, "pool_a");
  JobInfo info2 = scheduler.infos.get(job2);
  JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "pool_b");
  JobInfo info3 = scheduler.infos.get(job3);
  advanceTime(10);

  assertEquals(0,     info1.mapWeight, 0.01);
  assertEquals(1.0,   info1.reduceWeight, 0.01);
  assertEquals(0,     info2.mapWeight, 0.01);
  assertEquals(1.0,   info2.reduceWeight, 0.01);
  assertEquals(1.0,   info3.mapWeight, 0.01);
  assertEquals(1.0,   info3.reduceWeight, 0.01);

  assertEquals(0,     info1.mapFairShare, ALLOW_ERROR);
  assertEquals(1.33,  info1.reduceFairShare, ALLOW_ERROR);
  assertEquals(0,     info2.mapFairShare, ALLOW_ERROR);
  assertEquals(1.33,  info2.reduceFairShare, ALLOW_ERROR);
  assertEquals(4,     info3.mapFairShare, ALLOW_ERROR);
  assertEquals(1.33,  info3.reduceFairShare, ALLOW_ERROR);
}
项目:hadoop-EAR    文件:TestFairScheduler.java   
/**
 * Verify the FIFO pool weight adjust
 */
public void testPoolFifoWeight() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"pool_a\">");
  out.println("<fifo>true</fifo>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();

  scheduler.getPoolManager().reloadAllocs();
  JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
  advanceTime(1L);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
  advanceTime(2L);
  JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
  advanceTime(3L);
  JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
  scheduler.update();

  JobInfo info1 = scheduler.infos.get(job1);
  JobInfo info2 = scheduler.infos.get(job2);
  JobInfo info3 = scheduler.infos.get(job3);
  JobInfo info4 = scheduler.infos.get(job4);

  final double ALLOWED_ERROR = 0.00001;
  assertEquals(8.0 / 15, info1.mapWeight, ALLOWED_ERROR);
  assertEquals(4.0 / 15, info2.mapWeight, ALLOWED_ERROR);
  assertEquals(2.0 / 15, info3.mapWeight, ALLOWED_ERROR);
  assertEquals(1.0 / 15, info4.mapWeight, ALLOWED_ERROR);
}
项目:hadoop-EAR    文件:TestFairScheduler.java   
/**
 * Verify the min slots of FIFO pools
 */
public void testPoolFifoMin() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"pool_a\">");
  out.println("<fifo>true</fifo>");
  out.println("<minMaps>12</minMaps>");
  out.println("<minReduces>12</minReduces>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();

  scheduler.getPoolManager().reloadAllocs();
  JobInProgress job1 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
  advanceTime(1L);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
  advanceTime(2L);
  JobInProgress job3 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
  advanceTime(3L);
  JobInProgress job4 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
  scheduler.update();

  JobInfo info1 = scheduler.infos.get(job1);
  JobInfo info2 = scheduler.infos.get(job2);
  JobInfo info3 = scheduler.infos.get(job3);
  JobInfo info4 = scheduler.infos.get(job4);

  assertEquals(5, info1.minMaps);
  assertEquals(5, info2.minMaps);
  assertEquals(2, info3.minMaps);
  assertEquals(0, info4.minMaps);

  assertEquals(5, info1.minReduces);
  assertEquals(5, info2.minReduces);
  assertEquals(2, info3.minReduces);
  assertEquals(0, info4.minReduces);
}
项目:hadoop-EAR    文件:TestFairScheduler.java   
/**
 * Verify the max slots of FIFO pools
 */
public void testPoolFifoMax() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"pool_a\">");
  out.println("<fifo>true</fifo>");
  out.println("<maxMaps>12</maxMaps>");
  out.println("<maxReduces>12</maxReduces>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();

  scheduler.getPoolManager().reloadAllocs();
  JobInProgress job1 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
  advanceTime(1L);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
  advanceTime(2L);
  JobInProgress job3 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
  advanceTime(3L);
  JobInProgress job4 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
  scheduler.update();

  JobInfo info1 = scheduler.infos.get(job1);
  JobInfo info2 = scheduler.infos.get(job2);
  JobInfo info3 = scheduler.infos.get(job3);
  JobInfo info4 = scheduler.infos.get(job4);

  assertEquals(5, info1.maxMaps);
  assertEquals(5, info2.maxMaps);
  assertEquals(2, info3.maxMaps);
  assertEquals(0, info4.maxMaps);

  assertEquals(5, info1.maxReduces);
  assertEquals(5, info2.maxReduces);
  assertEquals(2, info3.maxReduces);
  assertEquals(0, info4.maxReduces);
}
项目:hadoop-on-lustre    文件:TestFairScheduler.java   
/**
 * This test exercises delay scheduling at the node level. We submit a job
 * with data on rack1.node2 and check that it doesn't get assigned on earlier
 * nodes. A second job with no locality info should get assigned instead.
 * 
 * TaskTracker names in this test map to nodes as follows:
 * - tt1 = rack1.node1
 * - tt2 = rack1.node2
 * - tt3 = rack2.node1
 * - tt4 = rack2.node2
 */
public void testDelaySchedulingAtNodeLevel() throws IOException {
  setUpCluster(2, 2, true);
  scheduler.assignMultiple = true;

  JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 0, "pool1",
      new String[][] {
        {"rack2.node2"}
      }, true);
  JobInfo info1 = scheduler.infos.get(job1);

  // Advance time before submitting another job j2, to make j1 be ahead
  // of j2 in the queue deterministically.
  advanceTime(100);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 0);

  // Assign tasks on nodes 1-3 and check that j2 gets them
  checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1", 
                         "attempt_test_0002_m_000001_0 on tt1");
  checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2",
                         "attempt_test_0002_m_000003_0 on tt2");
  checkAssignment("tt3", "attempt_test_0002_m_000004_0 on tt3",
                         "attempt_test_0002_m_000005_0 on tt3");

  // Assign a task on node 4 now and check that j1 gets it. The other slot
  // on the node should be given to j2 because j1 will be out of tasks.
  checkAssignment("tt4", "attempt_test_0001_m_000000_0 on tt4",
                         "attempt_test_0002_m_000006_0 on tt4");

  // Check that delay scheduling info is properly set
  assertEquals(info1.lastMapLocalityLevel, LocalityLevel.NODE);
  assertEquals(info1.timeWaitedForLocalMap, 0);
  assertEquals(info1.skippedAtLastHeartbeat, false);
}
项目:RDFS    文件:TestFairScheduler.java   
/**
 * We submit two jobs at interval of 200 such that job2 has 2x the priority
 * of the job1, then wait for 100 ms, and check that all slots are assigned
 * to job 1 even though job 2 has higher priority and fair scheduler would
 * have allocated atleast a few slots to job 2
 */
public void testFifoJobScheduler() throws Exception {
   // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"pool_a\">");
  out.println("<minMaps>2</minMaps>");
  out.println("<minReduces>2</minReduces>");
  // enable fifo
  out.println("<fifo>true</fifo>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();

  JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
  JobInfo info1 = scheduler.infos.get(job1);

  // Advance time 200ms and submit job 2
  advanceTime(200);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
  JobInfo info2 = scheduler.infos.get(job2);
  job2.setPriority(JobPriority.HIGH);
  // Advance time 100ms
  advanceTime(100);

  // Assign tasks and check that all slots are given to job1
  checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
  checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
}
项目:RDFS    文件:TestFairScheduler.java   
/**
 * This test configures a pool pool_a, and redirects the default to it.
 */
public void testPoolRedirect() throws Exception {
  // Set up pools file
  // pool_a has 0 totalInitedTasks, default does not have that restriction.
  // The redirect from default -> pool_a should enforce 0 total inited tasks.
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"default\">");
  out.println("<maxTotalInitedTasks>100</maxTotalInitedTasks>");
  out.println("<redirect>pool_a</redirect>");
  out.println("</pool>");
  out.println("<pool name=\"pool_a\">");
  out.println("<maxTotalInitedTasks>0</maxTotalInitedTasks>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();

  // Submit a job.
  JobInProgress job1 = submitJobNoInitialization(JobStatus.PREP, 10, 10);
  JobInfo info1 = scheduler.infos.get(job1);
  advanceTime(10);
  Thread.sleep(1000L); // Let JobInitializaer to finish the work

  // Should have gone to pool_a, not default
  assertEquals(info1.poolName, "pool_a");
}
项目:RDFS    文件:TestFairScheduler.java   
/**
 * This test submits jobs in two pools, pool_a and pool_b. None of the
 * jobs in pool_a have maps, but this should not affect their reduce
 * share.
 */
public void testPoolWeightsWhenNoMaps() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"pool_a\">");
  out.println("<weight>2.0</weight>");
  out.println("</pool>");
  out.println("<pool name=\"pool_b\">");
  out.println("<weight>1.0</weight>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();

  // Submit jobs, advancing time in-between to make sure that they are
  // all submitted at distinct times.
  JobInProgress job1 = submitJob(JobStatus.RUNNING, 0, 10, "pool_a");
  JobInfo info1 = scheduler.infos.get(job1);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 0, 10, "pool_a");
  JobInfo info2 = scheduler.infos.get(job2);
  JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "pool_b");
  JobInfo info3 = scheduler.infos.get(job3);
  advanceTime(10);

  assertEquals(0,     info1.mapWeight, 0.01);
  assertEquals(1.0,   info1.reduceWeight, 0.01);
  assertEquals(0,     info2.mapWeight, 0.01);
  assertEquals(1.0,   info2.reduceWeight, 0.01);
  assertEquals(1.0,   info3.mapWeight, 0.01);
  assertEquals(1.0,   info3.reduceWeight, 0.01);

  assertEquals(0,     info1.mapFairShare, ALLOW_ERROR);
  assertEquals(1.33,  info1.reduceFairShare, ALLOW_ERROR);
  assertEquals(0,     info2.mapFairShare, ALLOW_ERROR);
  assertEquals(1.33,  info2.reduceFairShare, ALLOW_ERROR);
  assertEquals(4,     info3.mapFairShare, ALLOW_ERROR);
  assertEquals(1.33,  info3.reduceFairShare, ALLOW_ERROR);
}
项目:RDFS    文件:TestFairScheduler.java   
/**
 * Verify the FIFO pool weight adjust
 */
public void testPoolFifoWeight() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"pool_a\">");
  out.println("<fifo>true</fifo>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();

  scheduler.getPoolManager().reloadAllocs();
  JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
  advanceTime(1L);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
  advanceTime(2L);
  JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
  advanceTime(3L);
  JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
  scheduler.update();

  JobInfo info1 = scheduler.infos.get(job1);
  JobInfo info2 = scheduler.infos.get(job2);
  JobInfo info3 = scheduler.infos.get(job3);
  JobInfo info4 = scheduler.infos.get(job4);

  final double ALLOWED_ERROR = 0.00001;
  assertEquals(8.0 / 15, info1.mapWeight, ALLOWED_ERROR);
  assertEquals(4.0 / 15, info2.mapWeight, ALLOWED_ERROR);
  assertEquals(2.0 / 15, info3.mapWeight, ALLOWED_ERROR);
  assertEquals(1.0 / 15, info4.mapWeight, ALLOWED_ERROR);
}
项目:RDFS    文件:TestFairScheduler.java   
/**
 * Verify the min slots of FIFO pools
 */
public void testPoolFifoMin() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"pool_a\">");
  out.println("<fifo>true</fifo>");
  out.println("<minMaps>12</minMaps>");
  out.println("<minReduces>12</minReduces>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();

  scheduler.getPoolManager().reloadAllocs();
  JobInProgress job1 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
  advanceTime(1L);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
  advanceTime(2L);
  JobInProgress job3 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
  advanceTime(3L);
  JobInProgress job4 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
  scheduler.update();

  JobInfo info1 = scheduler.infos.get(job1);
  JobInfo info2 = scheduler.infos.get(job2);
  JobInfo info3 = scheduler.infos.get(job3);
  JobInfo info4 = scheduler.infos.get(job4);

  assertEquals(5, info1.minMaps);
  assertEquals(5, info2.minMaps);
  assertEquals(2, info3.minMaps);
  assertEquals(0, info4.minMaps);

  assertEquals(5, info1.minReduces);
  assertEquals(5, info2.minReduces);
  assertEquals(2, info3.minReduces);
  assertEquals(0, info4.minReduces);
}
项目:RDFS    文件:TestFairScheduler.java   
/**
 * Verify the max slots of FIFO pools
 */
public void testPoolFifoMax() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"pool_a\">");
  out.println("<fifo>true</fifo>");
  out.println("<maxMaps>12</maxMaps>");
  out.println("<maxReduces>12</maxReduces>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();

  scheduler.getPoolManager().reloadAllocs();
  JobInProgress job1 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
  advanceTime(1L);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
  advanceTime(2L);
  JobInProgress job3 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
  advanceTime(3L);
  JobInProgress job4 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
  scheduler.update();

  JobInfo info1 = scheduler.infos.get(job1);
  JobInfo info2 = scheduler.infos.get(job2);
  JobInfo info3 = scheduler.infos.get(job3);
  JobInfo info4 = scheduler.infos.get(job4);

  assertEquals(5, info1.maxMaps);
  assertEquals(5, info2.maxMaps);
  assertEquals(2, info3.maxMaps);
  assertEquals(0, info4.maxMaps);

  assertEquals(5, info1.maxReduces);
  assertEquals(5, info2.maxReduces);
  assertEquals(2, info3.maxReduces);
  assertEquals(0, info4.maxReduces);
}
项目:hadoop-0.20    文件:TestFairScheduler.java   
/**
 * This test submits jobs in two pools, poolA and poolB. None of the
 * jobs in poolA have maps, but this should not affect their reduce
 * share.
 */
public void testPoolWeightsWhenNoMaps() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"poolA\">");
  out.println("<weight>2.0</weight>");
  out.println("</pool>");
  out.println("<pool name=\"poolB\">");
  out.println("<weight>1.0</weight>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();

  // Submit jobs, advancing time in-between to make sure that they are
  // all submitted at distinct times.
  JobInProgress job1 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
  JobInfo info1 = scheduler.infos.get(job1);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
  JobInfo info2 = scheduler.infos.get(job2);
  JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
  JobInfo info3 = scheduler.infos.get(job3);
  advanceTime(10);

  assertEquals(0,     info1.mapWeight, 0.01);
  assertEquals(1.0,   info1.reduceWeight, 0.01);
  assertEquals(0,     info2.mapWeight, 0.01);
  assertEquals(1.0,   info2.reduceWeight, 0.01);
  assertEquals(1.0,   info3.mapWeight, 0.01);
  assertEquals(1.0,   info3.reduceWeight, 0.01);

  assertEquals(0,     info1.mapFairShare, 0.01);
  assertEquals(1.33,  info1.reduceFairShare, 0.01);
  assertEquals(0,     info2.mapFairShare, 0.01);
  assertEquals(1.33,  info2.reduceFairShare, 0.01);
  assertEquals(4,     info3.mapFairShare, 0.01);
  assertEquals(1.33,  info3.reduceFairShare, 0.01);
}
项目:hanoi-hadoop-2.0.0-cdh    文件:TestFairScheduler.java   
/**
 * This test exercises delay scheduling at the node level. We submit a job
 * with data on rack1.node2 and check that it doesn't get assigned on earlier
 * nodes. A second job with no locality info should get assigned instead.
 * 
 * TaskTracker names in this test map to nodes as follows:
 * - tt1 = rack1.node1
 * - tt2 = rack1.node2
 * - tt3 = rack2.node1
 * - tt4 = rack2.node2
 */
public void testDelaySchedulingAtNodeLevel() throws IOException {
  setUpCluster(2, 2, true);
  scheduler.assignMultiple = true;

  JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 0, "pool1",
      new String[][] {
        {"rack2.node2"}
      });
  JobInfo info1 = scheduler.infos.get(job1);

  // Advance time before submitting another job j2, to make j1 be ahead
  // of j2 in the queue deterministically.
  advanceTime(100);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 0);

  // Assign tasks on nodes 1-3 and check that j2 gets them
  checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1", 
                         "attempt_test_0002_m_000001_0 on tt1");
  checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2",
                         "attempt_test_0002_m_000003_0 on tt2");
  checkAssignment("tt3", "attempt_test_0002_m_000004_0 on tt3",
                         "attempt_test_0002_m_000005_0 on tt3");

  // Assign a task on node 4 now and check that j1 gets it. The other slot
  // on the node should be given to j2 because j1 will be out of tasks.
  checkAssignment("tt4", "attempt_test_0001_m_000000_0 on tt4",
                         "attempt_test_0002_m_000006_0 on tt4");

  // Check that delay scheduling info is properly set
  assertEquals(info1.lastMapLocalityLevel, LocalityLevel.NODE);
  assertEquals(info1.timeWaitedForLocalMap, 0);
  assertEquals(info1.skippedAtLastHeartbeat, false);
}
项目:mapreduce-fork    文件:TestFairScheduler.java   
/**
 * This test exercises delay scheduling at the node level. We submit a job
 * with data on rack1.node2 and check that it doesn't get assigned on earlier
 * nodes. A second job with no locality info should get assigned instead.
 * 
 * TaskTracker names in this test map to nodes as follows:
 * - tt1 = rack1.node1
 * - tt2 = rack1.node2
 * - tt3 = rack2.node1
 * - tt4 = rack2.node2
 */
public void testDelaySchedulingAtNodeLevel() throws IOException {
  setUpCluster(2, 2, true);
  scheduler.assignMultiple = true;

  JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 0, "pool1",
      new String[][] {
        {"rack2.node2"}
      }, true);
  JobInfo info1 = scheduler.infos.get(job1);

  // Advance time before submitting another job j2, to make j1 be ahead
  // of j2 in the queue deterministically.
  advanceTime(100);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 0);

  // Assign tasks on nodes 1-3 and check that j2 gets them
  checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1", 
                         "attempt_test_0002_m_000001_0 on tt1");
  checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2",
                         "attempt_test_0002_m_000003_0 on tt2");
  checkAssignment("tt3", "attempt_test_0002_m_000004_0 on tt3",
                         "attempt_test_0002_m_000005_0 on tt3");

  // Assign a task on node 4 now and check that j1 gets it. The other slot
  // on the node should be given to j2 because j1 will be out of tasks.
  checkAssignment("tt4", "attempt_test_0001_m_000000_0 on tt4",
                         "attempt_test_0002_m_000006_0 on tt4");

  // Check that delay scheduling info is properly set
  assertEquals(info1.lastMapLocalityLevel, LocalityLevel.NODE);
  assertEquals(info1.timeWaitedForLocalMap, 0);
  assertEquals(info1.skippedAtLastHeartbeat, false);
}
项目:hortonworks-extension    文件:TestFairScheduler.java   
/**
 * This test exercises delay scheduling at the node level. We submit a job
 * with data on rack1.node2 and check that it doesn't get assigned on earlier
 * nodes. A second job with no locality info should get assigned instead.
 * 
 * TaskTracker names in this test map to nodes as follows:
 * - tt1 = rack1.node1
 * - tt2 = rack1.node2
 * - tt3 = rack2.node1
 * - tt4 = rack2.node2
 */
public void testDelaySchedulingAtNodeLevel() throws IOException {
  setUpCluster(2, 2, true);
  scheduler.assignMultiple = true;

  JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 0, "pool1",
      new String[][] {
        {"rack2.node2"}
      }, true);
  JobInfo info1 = scheduler.infos.get(job1);

  // Advance time before submitting another job j2, to make j1 be ahead
  // of j2 in the queue deterministically.
  advanceTime(100);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 0);

  // Assign tasks on nodes 1-3 and check that j2 gets them
  checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1", 
                         "attempt_test_0002_m_000001_0 on tt1");
  checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2",
                         "attempt_test_0002_m_000003_0 on tt2");
  checkAssignment("tt3", "attempt_test_0002_m_000004_0 on tt3",
                         "attempt_test_0002_m_000005_0 on tt3");

  // Assign a task on node 4 now and check that j1 gets it. The other slot
  // on the node should be given to j2 because j1 will be out of tasks.
  checkAssignment("tt4", "attempt_test_0001_m_000000_0 on tt4",
                         "attempt_test_0002_m_000006_0 on tt4");

  // Check that delay scheduling info is properly set
  assertEquals(info1.lastMapLocalityLevel, LocalityLevel.NODE);
  assertEquals(info1.timeWaitedForLocalMap, 0);
  assertEquals(info1.skippedAtLastHeartbeat, false);
}
项目:hortonworks-extension    文件:TestFairScheduler.java   
/**
 * This test exercises delay scheduling at the node level. We submit a job
 * with data on rack1.node2 and check that it doesn't get assigned on earlier
 * nodes. A second job with no locality info should get assigned instead.
 * 
 * TaskTracker names in this test map to nodes as follows:
 * - tt1 = rack1.node1
 * - tt2 = rack1.node2
 * - tt3 = rack2.node1
 * - tt4 = rack2.node2
 */
public void testDelaySchedulingAtNodeLevel() throws IOException {
  setUpCluster(2, 2, true);
  scheduler.assignMultiple = true;

  JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 0, "pool1",
      new String[][] {
        {"rack2.node2"}
      }, true);
  JobInfo info1 = scheduler.infos.get(job1);

  // Advance time before submitting another job j2, to make j1 be ahead
  // of j2 in the queue deterministically.
  advanceTime(100);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 0);

  // Assign tasks on nodes 1-3 and check that j2 gets them
  checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1", 
                         "attempt_test_0002_m_000001_0 on tt1");
  checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2",
                         "attempt_test_0002_m_000003_0 on tt2");
  checkAssignment("tt3", "attempt_test_0002_m_000004_0 on tt3",
                         "attempt_test_0002_m_000005_0 on tt3");

  // Assign a task on node 4 now and check that j1 gets it. The other slot
  // on the node should be given to j2 because j1 will be out of tasks.
  checkAssignment("tt4", "attempt_test_0001_m_000000_0 on tt4",
                         "attempt_test_0002_m_000006_0 on tt4");

  // Check that delay scheduling info is properly set
  assertEquals(info1.lastMapLocalityLevel, LocalityLevel.NODE);
  assertEquals(info1.timeWaitedForLocalMap, 0);
  assertEquals(info1.skippedAtLastHeartbeat, false);
}
项目:hadoop-gpu    文件:TestFairScheduler.java   
/**
 * This test submits jobs in two pools, poolA and poolB. None of the
 * jobs in poolA have maps, but this should not affect their reduce
 * share.
 */
public void testPoolWeightsWhenNoMaps() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"poolA\">");
  out.println("<weight>2.0</weight>");
  out.println("</pool>");
  out.println("<pool name=\"poolB\">");
  out.println("<weight>1.0</weight>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();

  // Submit jobs, advancing time in-between to make sure that they are
  // all submitted at distinct times.
  JobInProgress job1 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
  JobInfo info1 = scheduler.infos.get(job1);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
  JobInfo info2 = scheduler.infos.get(job2);
  JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
  JobInfo info3 = scheduler.infos.get(job3);
  advanceTime(10);

  assertEquals(0,     info1.mapWeight, 0.01);
  assertEquals(1.0,   info1.reduceWeight, 0.01);
  assertEquals(0,     info2.mapWeight, 0.01);
  assertEquals(1.0,   info2.reduceWeight, 0.01);
  assertEquals(1.0,   info3.mapWeight, 0.01);
  assertEquals(1.0,   info3.reduceWeight, 0.01);

  assertEquals(0,     info1.mapFairShare, 0.01);
  assertEquals(1.33,  info1.reduceFairShare, 0.01);
  assertEquals(0,     info2.mapFairShare, 0.01);
  assertEquals(1.33,  info2.reduceFairShare, 0.01);
  assertEquals(4,     info3.mapFairShare, 0.01);
  assertEquals(1.33,  info3.reduceFairShare, 0.01);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:JobSchedulable.java   
private boolean isRunnable() {
  JobInfo info = scheduler.getJobInfo(job);
  int runState = job.getStatus().getRunState();
  return (info != null && info.runnable && runState == JobStatus.RUNNING);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:PoolSchedulable.java   
public void addJob(JobInProgress job) {
  JobInfo info = scheduler.getJobInfo(job);
  jobScheds.add(taskType == TaskType.MAP ?
      info.mapSchedulable : info.reduceSchedulable);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestFairScheduler.java   
/**
 * This test contains two jobs with fewer required tasks than there are slots.
 * We check that all tasks are assigned, but job 1 gets them first because it
 * was submitted earlier.
 */
public void testSmallJobs() throws IOException {
  JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1);
  JobInfo info1 = scheduler.infos.get(job1);

  // Check scheduler variables
  assertEquals(0,    info1.mapSchedulable.getRunningTasks());
  assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
  assertEquals(2,    info1.mapSchedulable.getDemand());
  assertEquals(1,    info1.reduceSchedulable.getDemand());
  assertEquals(2.0,  info1.mapSchedulable.getFairShare());
  assertEquals(1.0,  info1.reduceSchedulable.getFairShare());
  verifyMetrics();

  // Advance time before submitting another job j2, to make j1 run before j2
  // deterministically.
  advanceTime(100);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 2);
  JobInfo info2 = scheduler.infos.get(job2);

  // Check scheduler variables
  assertEquals(0,    info1.mapSchedulable.getRunningTasks());
  assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
  assertEquals(2,    info1.mapSchedulable.getDemand());
  assertEquals(1,    info1.reduceSchedulable.getDemand());
  assertEquals(2.0,  info1.mapSchedulable.getFairShare());
  assertEquals(1.0,  info1.reduceSchedulable.getFairShare());
  assertEquals(0,    info2.mapSchedulable.getRunningTasks());
  assertEquals(0,    info2.reduceSchedulable.getRunningTasks());
  assertEquals(1,    info2.mapSchedulable.getDemand());
  assertEquals(2,    info2.reduceSchedulable.getDemand());
  assertEquals(1.0,  info2.mapSchedulable.getFairShare());
  assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
  verifyMetrics();

  // Assign tasks and check that jobs alternate in filling slots
  checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
  checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
  checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
  assertNull(scheduler.assignTasks(tracker("tt2")));

  // Check that the scheduler has started counting the tasks as running
  // as soon as it launched them.
  assertEquals(2,  info1.mapSchedulable.getRunningTasks());
  assertEquals(1,  info1.reduceSchedulable.getRunningTasks());
  assertEquals(2,  info1.mapSchedulable.getDemand());
  assertEquals(1,  info1.reduceSchedulable.getDemand());
  assertEquals(1,  info2.mapSchedulable.getRunningTasks());
  assertEquals(2,  info2.reduceSchedulable.getRunningTasks());
  assertEquals(1, info2.mapSchedulable.getDemand());
  assertEquals(2, info2.reduceSchedulable.getDemand());
  verifyMetrics();
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestFairScheduler.java   
/**
 * This test is identical to testSmallJobs but sets assignMultiple to
 * true so that multiple tasks can be assigned per heartbeat.
 */
public void testSmallJobsWithAssignMultiple() throws IOException {
  setUpCluster(1, 2, true);

  JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1);
  JobInfo info1 = scheduler.infos.get(job1);

  // Check scheduler variables
  assertEquals(0,    info1.mapSchedulable.getRunningTasks());
  assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
  assertEquals(2,    info1.mapSchedulable.getDemand());
  assertEquals(1,    info1.reduceSchedulable.getDemand());
  assertEquals(2.0,  info1.mapSchedulable.getFairShare());
  assertEquals(1.0,  info1.reduceSchedulable.getFairShare());
  verifyMetrics();

  // Advance time before submitting another job j2, to make j1 run before j2
  // deterministically.
  advanceTime(100);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 2);
  JobInfo info2 = scheduler.infos.get(job2);

  // Check scheduler variables
  assertEquals(0,    info1.mapSchedulable.getRunningTasks());
  assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
  assertEquals(2,    info1.mapSchedulable.getDemand());
  assertEquals(1,    info1.reduceSchedulable.getDemand());
  assertEquals(2.0,  info1.mapSchedulable.getFairShare());
  assertEquals(1.0,  info1.reduceSchedulable.getFairShare());
  assertEquals(0,    info2.mapSchedulable.getRunningTasks());
  assertEquals(0,    info2.reduceSchedulable.getRunningTasks());
  assertEquals(1,    info2.mapSchedulable.getDemand());
  assertEquals(2,    info2.reduceSchedulable.getDemand());
  assertEquals(1.0,  info2.mapSchedulable.getFairShare());
  assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
  verifyMetrics();

  // Assign tasks and check that jobs alternate in filling slots
  checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1",
                         "attempt_test_0001_r_000000_0 on tt1",
                         "attempt_test_0002_m_000000_0 on tt1",
                         "attempt_test_0002_r_000000_0 on tt1");
  checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2",
                         "attempt_test_0002_r_000001_0 on tt2");
  assertNull(scheduler.assignTasks(tracker("tt2")));

  // Check that the scheduler has started counting the tasks as running
  // as soon as it launched them.
  assertEquals(2,  info1.mapSchedulable.getRunningTasks());
  assertEquals(1,  info1.reduceSchedulable.getRunningTasks());
  assertEquals(2,  info1.mapSchedulable.getDemand());
  assertEquals(1,  info1.reduceSchedulable.getDemand());
  assertEquals(1,  info2.mapSchedulable.getRunningTasks());
  assertEquals(2,  info2.reduceSchedulable.getRunningTasks());
  assertEquals(1, info2.mapSchedulable.getDemand());
  assertEquals(2, info2.reduceSchedulable.getDemand());
  verifyMetrics();
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestFairScheduler.java   
/**
 * We submit two jobs such that one has 2x the priority of the other to 
 * a cluster of 3 nodes, wait for 100 ms, and check that the weights/shares 
 * the high-priority job gets 4 tasks while the normal-priority job gets 2.
 */
public void testJobsWithPriorities() throws IOException {
  setUpCluster(1, 3, false);

  JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  JobInfo info1 = scheduler.infos.get(job1);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
  JobInfo info2 = scheduler.infos.get(job2);
  job2.setPriority(JobPriority.HIGH);
  scheduler.update();

  // Check scheduler variables
  assertEquals(0,   info1.mapSchedulable.getRunningTasks());
  assertEquals(0,   info1.reduceSchedulable.getRunningTasks());
  assertEquals(10,  info1.mapSchedulable.getDemand());
  assertEquals(10,  info1.reduceSchedulable.getDemand());
  assertEquals(2.0, info1.mapSchedulable.getFairShare(), 0.1);
  assertEquals(2.0, info1.reduceSchedulable.getFairShare(), 0.1);
  assertEquals(0,   info2.mapSchedulable.getRunningTasks());
  assertEquals(0,   info2.reduceSchedulable.getRunningTasks());
  assertEquals(10,  info2.mapSchedulable.getDemand());
  assertEquals(10,  info2.reduceSchedulable.getDemand());
  assertEquals(4.0, info2.mapSchedulable.getFairShare(), 0.1);
  assertEquals(4.0, info2.reduceSchedulable.getFairShare(), 0.1);

  // Advance time
  advanceTime(100);

  // Assign tasks and check that j2 gets 2x more tasks than j1. In addition,
  // whenever the jobs' runningTasks/weight ratios are tied, j1 should get
  // the new task first because it started first; thus the tasks of each
  // type should be handed out alternately to 1, 2, 2, 1, 2, 2, etc.
  System.out.println("HEREEEE");
  checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
  checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
  checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
  checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3");
  checkAssignment("tt3", "attempt_test_0002_r_000002_0 on tt3");
  checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
  checkAssignment("tt3", "attempt_test_0002_r_000003_0 on tt3");
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestFairScheduler.java   
/**
 * This test starts by submitting three large jobs:
 * - job1 in the default pool, at time 0
 * - job2 in poolA, with an allocation of 1 map / 2 reduces, at time 200
 * - job3 in poolB, with an allocation of 2 maps / 1 reduce, at time 300
 * 
 * We then assign tasks to all slots. The maps should be assigned in the
 * order job2, job3, job 3, job1 because jobs 3 and 2 have guaranteed slots
 * (1 and 2 respectively). Job2 comes before job3 when they are both at 0
 * slots because it has an earlier start time. In a similar manner,
 * reduces should be assigned as job2, job3, job2, job1.
 */
public void testLargeJobsWithPools() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  // Give pool A a minimum of 1 map, 2 reduces
  out.println("<pool name=\"poolA\">");
  out.println("<minMaps>1</minMaps>");
  out.println("<minReduces>2</minReduces>");
  out.println("</pool>");
  // Give pool B a minimum of 2 maps, 1 reduce
  out.println("<pool name=\"poolB\">");
  out.println("<minMaps>2</minMaps>");
  out.println("<minReduces>1</minReduces>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();
  Pool defaultPool = scheduler.getPoolManager().getPool("default");
  Pool poolA = scheduler.getPoolManager().getPool("poolA");
  Pool poolB = scheduler.getPoolManager().getPool("poolB");

  JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  JobInfo info1 = scheduler.infos.get(job1);

  // Check scheduler variables
  assertEquals(0,    info1.mapSchedulable.getRunningTasks());
  assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
  assertEquals(10,   info1.mapSchedulable.getDemand());
  assertEquals(10,   info1.reduceSchedulable.getDemand());
  assertEquals(4.0,  info1.mapSchedulable.getFairShare());
  assertEquals(4.0,  info1.reduceSchedulable.getFairShare());

  // Advance time 200ms and submit jobs 2 and 3
  advanceTime(200);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
  JobInfo info2 = scheduler.infos.get(job2);
  advanceTime(100);
  JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
  JobInfo info3 = scheduler.infos.get(job3);

  // Check that minimum and fair shares have been allocated
  assertEquals(0,    defaultPool.getMapSchedulable().getMinShare());
  assertEquals(0,    defaultPool.getReduceSchedulable().getMinShare());
  assertEquals(1.0,  info1.mapSchedulable.getFairShare());
  assertEquals(1.0,  info1.reduceSchedulable.getFairShare());
  assertEquals(1,    poolA.getMapSchedulable().getMinShare());
  assertEquals(2,    poolA.getReduceSchedulable().getMinShare());
  assertEquals(1.0,  info2.mapSchedulable.getFairShare());
  assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
  assertEquals(2,    poolB.getMapSchedulable().getMinShare());
  assertEquals(1,    poolB.getReduceSchedulable().getMinShare());
  assertEquals(2.0,  info3.mapSchedulable.getFairShare());
  assertEquals(1.0,  info3.reduceSchedulable.getFairShare());

  // Advance time 100ms
  advanceTime(100);

  // Assign tasks and check that slots are first given to needy jobs
  checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
  checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
  checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_r_000000_0 on tt2");
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestFairScheduler.java   
/**
 * This test starts by submitting three large jobs:
 * - job1 in the default pool, at time 0
 * - job2 in poolA, with an allocation of 2 maps / 2 reduces, at time 200
 * - job3 in poolA, with an allocation of 2 maps / 2 reduces, at time 300
 * 
 * After this, we start assigning tasks. The first two tasks of each type
 * should be assigned to job2 and job3 since they are in a pool with an
 * allocation guarantee, but the next two slots should be assigned to job 3
 * because the pool will no longer be needy.
 */
public void testLargeJobsWithExcessCapacity() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  // Give pool A a minimum of 2 maps, 2 reduces
  out.println("<pool name=\"poolA\">");
  out.println("<minMaps>2</minMaps>");
  out.println("<minReduces>2</minReduces>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();
  Pool poolA = scheduler.getPoolManager().getPool("poolA");

  JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  JobInfo info1 = scheduler.infos.get(job1);

  // Check scheduler variables
  assertEquals(0,    info1.mapSchedulable.getRunningTasks());
  assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
  assertEquals(10,   info1.mapSchedulable.getDemand());
  assertEquals(10,   info1.reduceSchedulable.getDemand());
  assertEquals(4.0,  info1.mapSchedulable.getFairShare());
  assertEquals(4.0,  info1.reduceSchedulable.getFairShare());

  // Advance time 200ms and submit job 2
  advanceTime(200);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
  JobInfo info2 = scheduler.infos.get(job2);

  // Check that minimum and fair shares have been allocated
  assertEquals(2,    poolA.getMapSchedulable().getMinShare());
  assertEquals(2,    poolA.getReduceSchedulable().getMinShare());
  assertEquals(2.0,  info1.mapSchedulable.getFairShare());
  assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
  assertEquals(2.0,  info2.mapSchedulable.getFairShare());
  assertEquals(2.0,  info2.reduceSchedulable.getFairShare());

  // Advance time 100ms and submit job 3
  advanceTime(100);
  JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
  JobInfo info3 = scheduler.infos.get(job3);

  // Check that minimum and fair shares have been allocated
  assertEquals(2,    poolA.getMapSchedulable().getMinShare());
  assertEquals(2,    poolA.getReduceSchedulable().getMinShare());
  assertEquals(2.0,  info1.mapSchedulable.getFairShare());
  assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
  assertEquals(1.0,  info2.mapSchedulable.getFairShare());
  assertEquals(1.0,  info2.reduceSchedulable.getFairShare());
  assertEquals(1.0,  info3.mapSchedulable.getFairShare());
  assertEquals(1.0,  info3.reduceSchedulable.getFairShare());

  // Advance time
  advanceTime(100);

  // Assign tasks and check that slots are first given to needy jobs, but
  // that job 1 gets two tasks after due to having a larger share.
  checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
  checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_r_000000_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestFairScheduler.java   
/**
 * A copy of testLargeJobsWithExcessCapacity that enables assigning multiple
 * tasks per heartbeat. Results should match testLargeJobsWithExcessCapacity.
 */
public void testLargeJobsWithExcessCapacityAndAssignMultiple() 
    throws Exception {
  setUpCluster(1, 2, true);

  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  // Give pool A a minimum of 2 maps, 2 reduces
  out.println("<pool name=\"poolA\">");
  out.println("<minMaps>2</minMaps>");
  out.println("<minReduces>2</minReduces>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();
  Pool poolA = scheduler.getPoolManager().getPool("poolA");

  JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  JobInfo info1 = scheduler.infos.get(job1);

  // Check scheduler variables
  assertEquals(0,    info1.mapSchedulable.getRunningTasks());
  assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
  assertEquals(10,   info1.mapSchedulable.getDemand());
  assertEquals(10,   info1.reduceSchedulable.getDemand());
  assertEquals(4.0,  info1.mapSchedulable.getFairShare());
  assertEquals(4.0,  info1.reduceSchedulable.getFairShare());

  // Advance time 200ms and submit job 2
  advanceTime(200);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
  JobInfo info2 = scheduler.infos.get(job2);

  // Check that minimum and fair shares have been allocated
  assertEquals(2,    poolA.getMapSchedulable().getMinShare());
  assertEquals(2,    poolA.getReduceSchedulable().getMinShare());
  assertEquals(2.0,  info1.mapSchedulable.getFairShare());
  assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
  assertEquals(2.0,  info2.mapSchedulable.getFairShare());
  assertEquals(2.0,  info2.reduceSchedulable.getFairShare());

  // Advance time 100ms and submit job 3
  advanceTime(100);
  JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
  JobInfo info3 = scheduler.infos.get(job3);

  // Check that minimum and fair shares have been allocated
  assertEquals(2,    poolA.getMapSchedulable().getMinShare());
  assertEquals(2,    poolA.getReduceSchedulable().getMinShare());
  assertEquals(2.0,  info1.mapSchedulable.getFairShare());
  assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
  assertEquals(1.0,  info2.mapSchedulable.getFairShare());
  assertEquals(1.0,  info2.reduceSchedulable.getFairShare());
  assertEquals(1.0,  info3.mapSchedulable.getFairShare());
  assertEquals(1.0,  info3.reduceSchedulable.getFairShare());

  // Advance time
  advanceTime(100);

  // Assign tasks and check that slots are first given to needy jobs, but
  // that job 1 gets two tasks after due to having a larger share.
  checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1",
                         "attempt_test_0002_r_000000_0 on tt1",
                         "attempt_test_0003_m_000000_0 on tt1",
                         "attempt_test_0003_r_000000_0 on tt1");
  checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2",
                         "attempt_test_0001_r_000000_0 on tt2",
                         "attempt_test_0001_m_000001_0 on tt2",
                         "attempt_test_0001_r_000001_0 on tt2");
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestFairScheduler.java   
/**
 * This test starts by submitting two jobs at time 0:
 * - job1 in the default pool
 * - job2, with 1 map and 1 reduce, in poolA, which has an alloc of 4
 *   maps and 4 reduces
 * 
 * When we assign the slots, job2 should only get 1 of each type of task.
 */
public void testSmallJobInLargePool() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  // Give pool A a minimum of 4 maps, 4 reduces
  out.println("<pool name=\"poolA\">");
  out.println("<minMaps>4</minMaps>");
  out.println("<minReduces>4</minReduces>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();

  JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  JobInfo info1 = scheduler.infos.get(job1);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 1, "poolA");
  JobInfo info2 = scheduler.infos.get(job2);

  // Check scheduler variables
  assertEquals(0,    info1.mapSchedulable.getRunningTasks());
  assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
  assertEquals(10,   info1.mapSchedulable.getDemand());
  assertEquals(10,   info1.reduceSchedulable.getDemand());
  assertEquals(3.0,  info1.mapSchedulable.getFairShare());
  assertEquals(3.0,  info1.reduceSchedulable.getFairShare());
  assertEquals(0,    info2.mapSchedulable.getRunningTasks());
  assertEquals(0,    info2.reduceSchedulable.getRunningTasks());
  assertEquals(1,    info2.mapSchedulable.getDemand());
  assertEquals(1,    info2.reduceSchedulable.getDemand());
  assertEquals(1.0,  info2.mapSchedulable.getFairShare());
  assertEquals(1.0,  info2.reduceSchedulable.getFairShare());

  // Assign tasks and check that slots are first given to needy jobs
  checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
  checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestFairScheduler.java   
/**
 * This test starts by submitting four jobs in the default pool. However, the
 * maxRunningJobs limit for this pool has been set to two. We should see only
 * the first two jobs get scheduled, each with half the total slots.
 */
public void testPoolMaxJobs() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"default\">");
  out.println("<maxRunningJobs>2</maxRunningJobs>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();

  // Submit jobs, advancing time in-between to make sure that they are
  // all submitted at distinct times.
  JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  JobInfo info1 = scheduler.infos.get(job1);
  advanceTime(10);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
  JobInfo info2 = scheduler.infos.get(job2);
  advanceTime(10);
  JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
  JobInfo info3 = scheduler.infos.get(job3);
  advanceTime(10);
  JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
  JobInfo info4 = scheduler.infos.get(job4);

  // Check scheduler variables
  assertEquals(2.0,  info1.mapSchedulable.getFairShare());
  assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
  assertEquals(2.0,  info2.mapSchedulable.getFairShare());
  assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
  assertEquals(0.0,  info3.mapSchedulable.getFairShare());
  assertEquals(0.0,  info3.reduceSchedulable.getFairShare());
  assertEquals(0.0,  info4.mapSchedulable.getFairShare());
  assertEquals(0.0,  info4.reduceSchedulable.getFairShare());

  // Assign tasks and check that only jobs 1 and 2 get them
  checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
  advanceTime(100);
  checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
  checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
  checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestFairScheduler.java   
/**
 * This test starts by submitting two jobs by user "user1" to the default
 * pool, and two jobs by "user2". We set user1's job limit to 1. We should
 * see one job from user1 and two from user2. 
 */
public void testUserMaxJobs() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<user name=\"user1\">");
  out.println("<maxRunningJobs>1</maxRunningJobs>");
  out.println("</user>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();

  // Submit jobs, advancing time in-between to make sure that they are
  // all submitted at distinct times.
  JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  job1.getJobConf().set("user.name", "user1");
  JobInfo info1 = scheduler.infos.get(job1);
  advanceTime(10);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
  job2.getJobConf().set("user.name", "user1");
  JobInfo info2 = scheduler.infos.get(job2);
  advanceTime(10);
  JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
  job3.getJobConf().set("user.name", "user2");
  JobInfo info3 = scheduler.infos.get(job3);
  advanceTime(10);
  JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
  job4.getJobConf().set("user.name", "user2");
  JobInfo info4 = scheduler.infos.get(job4);

  // Check scheduler variables
  assertEquals(1.33,  info1.mapSchedulable.getFairShare(), 0.1);
  assertEquals(1.33,  info1.reduceSchedulable.getFairShare(), 0.1);
  assertEquals(0.0,   info2.mapSchedulable.getFairShare());
  assertEquals(0.0,   info2.reduceSchedulable.getFairShare());
  assertEquals(1.33,  info3.mapSchedulable.getFairShare(), 0.1);
  assertEquals(1.33,  info3.reduceSchedulable.getFairShare(), 0.1);
  assertEquals(1.33,  info4.mapSchedulable.getFairShare(), 0.1);
  assertEquals(1.33,  info4.reduceSchedulable.getFairShare(), 0.1);

  // Assign tasks and check that slots are given only to jobs 1, 3 and 4
  checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
  advanceTime(100);
  checkAssignment("tt2", "attempt_test_0004_m_000000_0 on tt2");
  checkAssignment("tt2", "attempt_test_0004_r_000000_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestFairScheduler.java   
/**
 * This test submits jobs in three pools: poolA, which has a weight
 * of 2.0; poolB, which has a weight of 0.5; and the default pool, which
 * should have a weight of 1.0. It then checks that the map and reduce
 * fair shares are given out accordingly. We then submit a second job to
 * pool B and check that each gets half of the pool (weight of 0.25).
 */
public void testPoolWeights() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"poolA\">");
  out.println("<weight>2.0</weight>");
  out.println("</pool>");
  out.println("<pool name=\"poolB\">");
  out.println("<weight>0.5</weight>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();

  // Submit jobs, advancing time in-between to make sure that they are
  // all submitted at distinct times.
  JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  JobInfo info1 = scheduler.infos.get(job1);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
  JobInfo info2 = scheduler.infos.get(job2);
  JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
  JobInfo info3 = scheduler.infos.get(job3);
  advanceTime(10);

  assertEquals(1.14,  info1.mapSchedulable.getFairShare(), 0.01);
  assertEquals(1.14,  info1.reduceSchedulable.getFairShare(), 0.01);
  assertEquals(2.28,  info2.mapSchedulable.getFairShare(), 0.01);
  assertEquals(2.28,  info2.reduceSchedulable.getFairShare(), 0.01);
  assertEquals(0.57,  info3.mapSchedulable.getFairShare(), 0.01);
  assertEquals(0.57,  info3.reduceSchedulable.getFairShare(), 0.01);

  JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
  JobInfo info4 = scheduler.infos.get(job4);
  advanceTime(10);

  assertEquals(1.14,  info1.mapSchedulable.getFairShare(), 0.01);
  assertEquals(1.14,  info1.reduceSchedulable.getFairShare(), 0.01);
  assertEquals(2.28,  info2.mapSchedulable.getFairShare(), 0.01);
  assertEquals(2.28,  info2.reduceSchedulable.getFairShare(), 0.01);
  assertEquals(0.28,  info3.mapSchedulable.getFairShare(), 0.01);
  assertEquals(0.28,  info3.reduceSchedulable.getFairShare(), 0.01);
  assertEquals(0.28,  info4.mapSchedulable.getFairShare(), 0.01);
  assertEquals(0.28,  info4.reduceSchedulable.getFairShare(), 0.01);
  verifyMetrics();    
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestFairScheduler.java   
/**
 * This test submits jobs in two pools, poolA and poolB. None of the
 * jobs in poolA have maps, but this should not affect their reduce
 * share.
 */
public void testPoolWeightsWhenNoMaps() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"poolA\">");
  out.println("<weight>2.0</weight>");
  out.println("</pool>");
  out.println("<pool name=\"poolB\">");
  out.println("<weight>1.0</weight>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();

  // Submit jobs, advancing time in-between to make sure that they are
  // all submitted at distinct times.
  JobInProgress job1 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
  JobInfo info1 = scheduler.infos.get(job1);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
  JobInfo info2 = scheduler.infos.get(job2);
  JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
  JobInfo info3 = scheduler.infos.get(job3);
  advanceTime(10);

  /*
  assertEquals(0,     info1.mapWeight, 0.01);
  assertEquals(1.0,   info1.reduceWeight, 0.01);
  assertEquals(0,     info2.mapWeight, 0.01);
  assertEquals(1.0,   info2.reduceWeight, 0.01);
  assertEquals(1.0,   info3.mapWeight, 0.01);
  assertEquals(1.0,   info3.reduceWeight, 0.01);
  */

  assertEquals(0,     info1.mapSchedulable.getFairShare(), 0.01);
  assertEquals(1.33,  info1.reduceSchedulable.getFairShare(), 0.01);
  assertEquals(0,     info2.mapSchedulable.getFairShare(), 0.01);
  assertEquals(1.33,  info2.reduceSchedulable.getFairShare(), 0.01);
  assertEquals(4,     info3.mapSchedulable.getFairShare(), 0.01);
  assertEquals(1.33,  info3.reduceSchedulable.getFairShare(), 0.01);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestFairScheduler.java   
/**
 * This test submits a job that takes all 2 slots in a pool has both a min
 * share of 2 slots with minshare timeout of 5s, and then a second job in
 * default pool with a fair share timeout of 5s. After 60 seconds, this pool
 * will be starved of fair share (2 slots of each type), and we test that it
 * does not kill more than 2 tasks of each type.
 */
public void testFairSharePreemptionWithShortTimeout() throws Exception {
  // Enable preemption in scheduler
  scheduler.preemptionEnabled = true;
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<fairSharePreemptionTimeout>5</fairSharePreemptionTimeout>");
  out.println("<pool name=\"pool1\">");
  out.println("<minMaps>2</minMaps>");
  out.println("<minReduces>2</minReduces>");
  out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();
  Pool pool1 = scheduler.getPoolManager().getPool("pool1");
  Pool defaultPool = scheduler.getPoolManager().getPool("default");

  // Submit job 1 and assign all slots to it. Sleep a bit before assigning
  // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
  JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10, "pool1");
  JobInfo info1 = scheduler.infos.get(job1);
  checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
  advanceTime(100);
  checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");

  advanceTime(10000);
  assertEquals(4,    info1.mapSchedulable.getRunningTasks());
  assertEquals(4,    info1.reduceSchedulable.getRunningTasks());
  assertEquals(4.0,  info1.mapSchedulable.getFairShare());
  assertEquals(4.0,  info1.reduceSchedulable.getFairShare());
  // Ten seconds later, submit job 2.
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "default");

  // Advance time by 6 seconds without update the scheduler.
  // This simulates the time gap between update and task preemption.
  clock.advance(6000);
  assertEquals(4,    info1.mapSchedulable.getRunningTasks());
  assertEquals(4,    info1.reduceSchedulable.getRunningTasks());
  assertEquals(2.0,  info1.mapSchedulable.getFairShare());
  assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
  assertEquals(0, scheduler.tasksToPreempt(pool1.getMapSchedulable(),
      clock.getTime()));
  assertEquals(0, scheduler.tasksToPreempt(pool1.getReduceSchedulable(),
      clock.getTime()));
  assertEquals(2, scheduler.tasksToPreempt(defaultPool.getMapSchedulable(),
      clock.getTime()));
  assertEquals(2, scheduler.tasksToPreempt(defaultPool.getReduceSchedulable(),
      clock.getTime()));

  // Test that the tasks actually get preempted and we can assign new ones
  scheduler.preemptTasksIfNecessary();
  scheduler.update();
  assertEquals(2, job1.runningMaps());
  assertEquals(2, job1.runningReduces());
  checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
  checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
  checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
  checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
  assertNull(scheduler.assignTasks(tracker("tt1")));
  assertNull(scheduler.assignTasks(tracker("tt2")));
}
项目:hadoop-EAR    文件:TestFairScheduler.java   
/**
 * This test contains two jobs with fewer required tasks than there are slots.
 * We check that all tasks are assigned, but job 1 gets them first because it
 * was submitted earlier.
 */
public void testSmallJobs() throws IOException {
  JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1);
  JobInfo info1 = scheduler.infos.get(job1);

  // Check scheduler variables
  assertEquals(0,    info1.runningMaps);
  assertEquals(0,    info1.runningReduces);
  assertEquals(2,    info1.neededMaps);
  assertEquals(1,    info1.neededReduces);
  assertEquals(0,    info1.mapDeficit, ALLOW_DEFICIT_ERROR);
  assertEquals(0,    info1.reduceDeficit, ALLOW_DEFICIT_ERROR);
  assertEquals(2.0,  info1.mapFairShare, ALLOW_ERROR);
  assertEquals(1.0,  info1.reduceFairShare, ALLOW_ERROR);

  // Advance time before submitting another job j2, to make j1 run before j2
  // deterministically.
  advanceTime(100);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 2);
  JobInfo info2 = scheduler.infos.get(job2);

  // Check scheduler variables; the fair shares should now have been allocated
  // equally between j1 and j2, but j1 should have (2 slots)*(100 ms) map
  // deficit and (1 slots) * (100 ms) reduce deficit
  assertEquals(0,    info1.runningMaps);
  assertEquals(0,    info1.runningReduces);
  assertEquals(2,    info1.neededMaps);
  assertEquals(1,    info1.neededReduces);
  assertEquals(200,  info1.mapDeficit, ALLOW_DEFICIT_ERROR);
  assertEquals(100,  info1.reduceDeficit, ALLOW_DEFICIT_ERROR);
  assertEquals(2.0,  info1.mapFairShare, ALLOW_ERROR);
  assertEquals(1.0,  info1.reduceFairShare, ALLOW_ERROR);
  assertEquals(0,    info2.runningMaps);
  assertEquals(0,    info2.runningReduces);
  assertEquals(1,    info2.neededMaps);
  assertEquals(2,    info2.neededReduces);
  assertEquals(0,    info2.mapDeficit, ALLOW_DEFICIT_ERROR);
  assertEquals(0,    info2.reduceDeficit, ALLOW_DEFICIT_ERROR);
  assertEquals(1.0,  info2.mapFairShare, ALLOW_ERROR);
  assertEquals(2.0,  info2.reduceFairShare, ALLOW_ERROR);

  // Assign tasks and check that all slots are filled with j1, then j2
  checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
  checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
  checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
  assertNull(scheduler.assignTasks(tracker("tt2")));

  // Check that the scheduler has started counting the tasks as running
  // as soon as it launched them.
  assertEquals(2,  info1.runningMaps);
  assertEquals(1,  info1.runningReduces);
  assertEquals(0,  info1.neededMaps);
  assertEquals(0,  info1.neededReduces);
  assertEquals(1,  info2.runningMaps);
  assertEquals(2,  info2.runningReduces);
  assertEquals(0, info2.neededMaps);
  assertEquals(0, info2.neededReduces);
}
项目:hadoop-EAR    文件:TestFairScheduler.java   
/**
 * We submit two jobs such that one has 2x the priority of the other, wait
 * for 100 ms, and check that the weights/deficits are okay and that the
 * tasks all go to the high-priority job.
 */
public void testJobsWithPriorities() throws IOException {
  JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  JobInfo info1 = scheduler.infos.get(job1);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
  JobInfo info2 = scheduler.infos.get(job2);
  job2.setPriority(JobPriority.HIGH);
  scheduler.update();

  // Check scheduler variables
  assertEquals(0,    info1.runningMaps);
  assertEquals(0,    info1.runningReduces);
  assertEquals(10,   info1.neededMaps);
  assertEquals(10,   info1.neededReduces);
  assertEquals(0,    info1.mapDeficit, ALLOW_DEFICIT_ERROR);
  assertEquals(0,    info1.reduceDeficit, ALLOW_DEFICIT_ERROR);
  assertEquals(1.33, info1.mapFairShare, ALLOW_ERROR);
  assertEquals(1.33, info1.reduceFairShare, ALLOW_ERROR);
  assertEquals(0,    info2.runningMaps);
  assertEquals(0,    info2.runningReduces);
  assertEquals(10,   info2.neededMaps);
  assertEquals(10,   info2.neededReduces);
  assertEquals(0,    info2.mapDeficit, ALLOW_DEFICIT_ERROR);
  assertEquals(0,    info2.reduceDeficit, ALLOW_DEFICIT_ERROR);
  assertEquals(2.66, info2.mapFairShare, ALLOW_ERROR);
  assertEquals(2.66, info2.reduceFairShare, ALLOW_ERROR);

  // Advance time and check deficits
  advanceTime(100);
  assertEquals(133,  info1.mapDeficit, 1.0);
  assertEquals(133,  info1.reduceDeficit, 1.0);
  assertEquals(266,  info2.mapDeficit, 1.0);
  assertEquals(266,  info2.reduceDeficit, 1.0);

  // Assign tasks and check that all slots are filled with j1, then j2
  checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
  checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
  checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
  checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
  checkAssignment("tt2", "attempt_test_0002_r_000002_0 on tt2");
  checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
}
项目:hadoop-EAR    文件:TestFairScheduler.java   
/**
 * This test starts by submitting two jobs at time 0:
 * - job1 in the default pool
 * - job2, with 1 map and 1 reduce, in pool_a, which has an alloc of 4
 *   maps and 4 reduces
 *
 * When we assign the slots, job2 should only get 1 of each type of task.
 *
 * The fair share for job 2 should be 2.0 however, because even though it is
 * running only one task, it accumulates deficit in case it will have failures
 * or need speculative tasks later. (TODO: This may not be a good policy.)
 */
public void testSmallJobInLargePool() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  // Give pool A a minimum of 4 maps, 4 reduces
  out.println("<pool name=\"pool_a\">");
  out.println("<minMaps>4</minMaps>");
  out.println("<minReduces>4</minReduces>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();

  JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  JobInfo info1 = scheduler.infos.get(job1);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 1, "pool_a");
  JobInfo info2 = scheduler.infos.get(job2);

  // Check scheduler variables
  assertEquals(0,    info1.runningMaps);
  assertEquals(0,    info1.runningReduces);
  assertEquals(10,   info1.neededMaps);
  assertEquals(10,   info1.neededReduces);
  assertEquals(0,    info1.mapDeficit, ALLOW_DEFICIT_ERROR);
  assertEquals(0,    info1.reduceDeficit, ALLOW_DEFICIT_ERROR);
  assertEquals(3.0,  info1.mapFairShare, ALLOW_ERROR);
  assertEquals(3.0,  info1.reduceFairShare, ALLOW_ERROR);
  assertEquals(0,    info2.runningMaps);
  assertEquals(0,    info2.runningReduces);
  assertEquals(1,    info2.neededMaps);
  assertEquals(1,    info2.neededReduces);
  assertEquals(0,    info2.mapDeficit, ALLOW_DEFICIT_ERROR);
  assertEquals(0,    info2.reduceDeficit, ALLOW_DEFICIT_ERROR);
  assertEquals(1.0,  info2.mapFairShare, ALLOW_ERROR);
  assertEquals(1.0,  info2.reduceFairShare, ALLOW_ERROR);

  // Assign tasks and check that slots are first given to needy jobs
  checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
  checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
  checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
}
项目:hadoop-EAR    文件:TestFairScheduler.java   
/**
 * This test starts by submitting four jobs in the default pool. However, the
 * maxRunningJobs limit for this pool has been set to two. We should see only
 * the first two jobs get scheduled, each with half the total slots.
 */
public void testPoolMaxJobs() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"default\">");
  out.println("<maxRunningJobs>2</maxRunningJobs>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();

  // Submit jobs, advancing time in-between to make sure that they are
  // all submitted at distinct times.
  JobInProgress job1 = submitJobNoInitialization(JobStatus.PREP, 10, 10);
  JobInfo info1 = scheduler.infos.get(job1);
  advanceTime(10);
  JobInProgress job2 = submitJobNoInitialization(JobStatus.PREP, 10, 10);
  JobInfo info2 = scheduler.infos.get(job2);
  advanceTime(10);
  JobInProgress job3 = submitJobNoInitialization(JobStatus.PREP, 10, 10);
  JobInfo info3 = scheduler.infos.get(job3);
  advanceTime(10);
  JobInProgress job4 = submitJobNoInitialization(JobStatus.PREP, 10, 10);
  JobInfo info4 = scheduler.infos.get(job4);

  Thread.sleep(1000L); // Let JobInitializaer to finish the work

  // Only two of the jobs should be initialized.
  assertTrue(((FakeJobInProgress)job1).isInitialized());
  assertTrue(((FakeJobInProgress)job2).isInitialized());
  assertFalse(((FakeJobInProgress)job3).isInitialized());
  assertFalse(((FakeJobInProgress)job4).isInitialized());

  // Check scheduler variables
  assertEquals(2.0,  info1.mapFairShare, ALLOW_ERROR);
  assertEquals(2.0,  info1.reduceFairShare, ALLOW_ERROR);
  assertEquals(2.0,  info2.mapFairShare, ALLOW_ERROR);
  assertEquals(2.0,  info2.reduceFairShare, ALLOW_ERROR);
  assertEquals(0.0,  info3.mapFairShare, ALLOW_ERROR);
  assertEquals(0.0,  info3.reduceFairShare, ALLOW_ERROR);
  assertEquals(0.0,  info4.mapFairShare, ALLOW_ERROR);
  assertEquals(0.0,  info4.reduceFairShare, ALLOW_ERROR);

  // Assign tasks and check that slots are first to jobs 1 and 2
  checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
  advanceTime(100);
  checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
  checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
  checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
  checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
}
项目:hadoop-EAR    文件:TestFairScheduler.java   
/**
 * This test configures a pool pool_a, tries the submit a job
 * before and after blacklisting of pool_a.
 */
public void testpool_blacklisted() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"default\">");
  out.println("<maxTotalInitedTasks>100</maxTotalInitedTasks>");
  out.println("</pool>");
  out.println("<pool name=\"pool_a\">");
  out.println("<maxTotalInitedTasks>0</maxTotalInitedTasks>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();

  // Submit a job to the not blacklisted pool_a
  JobInProgress job1 =
      submitJobNoInitialization(JobStatus.PREP, 10, 10, "pool_a");
  JobInfo info1 = scheduler.infos.get(job1);
  advanceTime(10);
  Thread.sleep(1000L); // Let JobInitializaer to finish the work

  out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<pool name=\"default\">");
  out.println("<maxTotalInitedTasks>100</maxTotalInitedTasks>");
  out.println("</pool>");
  out.println("<pool name=\"pool_a\">");
  out.println("<maxTotalInitedTasks>0</maxTotalInitedTasks>");
  out.println("<blacklisted/>");
  out.println("</pool>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();

  // Submit a job to the newly blacklisted pool_a
  JobInProgress job2 =
      submitJobNoInitialization(JobStatus.PREP, 10, 10, "pool_a");
  JobInfo info2 = scheduler.infos.get(job2);
  advanceTime(10);
  Thread.sleep(1000L); // Let JobInitializaer to finish the work

  // pool_a is not blacklisted, so goes to pool_a
  assertEquals(info1.poolName, "pool_a");
  // pool_a is blacklisted, so goes to default
  assertEquals(info2.poolName, "default");
}
项目:hadoop-EAR    文件:TestFairScheduler.java   
/**
 * This test starts by submitting two jobs by user "user1" to the default
 * pool, and two jobs by "user2". We set user1's job limit to 1. We should
 * see one job from user1 and two from user2.
 */
public void testUserMaxJobs() throws Exception {
  // Set up pools file
  PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  out.println("<?xml version=\"1.0\"?>");
  out.println("<allocations>");
  out.println("<user name=\"user1\">");
  out.println("<maxRunningJobs>1</maxRunningJobs>");
  out.println("</user>");
  out.println("</allocations>");
  out.close();
  scheduler.getPoolManager().reloadAllocs();

  // Submit jobs, advancing time in-between to make sure that they are
  // all submitted at distinct times.
  JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  job1.getJobConf().set("user.name", "user1");
  JobInfo info1 = scheduler.infos.get(job1);
  advanceTime(10);
  JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
  job2.getJobConf().set("user.name", "user1");
  JobInfo info2 = scheduler.infos.get(job2);
  advanceTime(10);
  JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
  job3.getJobConf().set("user.name", "user2");
  JobInfo info3 = scheduler.infos.get(job3);
  advanceTime(10);
  JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
  job4.getJobConf().set("user.name", "user2");
  JobInfo info4 = scheduler.infos.get(job4);

  // Check scheduler variables
  assertEquals(1.33,  info1.mapFairShare, ALLOW_ERROR);
  assertEquals(1.33,  info1.reduceFairShare, ALLOW_ERROR);
  assertEquals(0.0,   info2.mapFairShare, ALLOW_ERROR);
  assertEquals(0.0,   info2.reduceFairShare, ALLOW_ERROR);
  assertEquals(1.33,  info3.mapFairShare, ALLOW_ERROR);
  assertEquals(1.33,  info3.reduceFairShare, ALLOW_ERROR);
  assertEquals(1.33,  info4.mapFairShare, ALLOW_ERROR);
  assertEquals(1.33,  info4.reduceFairShare, ALLOW_ERROR);

  // Assign tasks and check that slots are first to jobs 1 and 3
  checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
  checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
  advanceTime(100);
  checkAssignment("tt2", "attempt_test_0003_m_000000_0 on tt2");
  checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
  checkAssignment("tt2", "attempt_test_0003_r_000000_0 on tt2");
  checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2");
}