Java 类org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress 实例源码

项目:mapreduce-fork    文件:TestTrackerReservation.java   
/**
 * Test case to check task tracker reservation for a job which 
 * has a job blacklisted tracker.
 * <ol>
 * <li>Run a job which fails on one of the tracker.</li>
 * <li>Check if the job succeeds and has no reservation.</li>
 * </ol>
 * 
 * @throws Exception
 */

public void testTrackerReservationWithJobBlackListedTracker() throws Exception {
  FakeJobInProgress job = TestTaskTrackerBlacklisting.runBlackListingJob(
      jobTracker, trackers);
  assertEquals("Job has no blacklisted trackers", 1, job
      .getBlackListedTrackers().size());
  assertTrue("Tracker 1 not blacklisted for the job", job
      .getBlackListedTrackers().contains(
          JobInProgress.convertTrackerNameToHostName(trackers[0])));
  assertEquals("Job didnt complete successfully complete", job.getStatus()
      .getRunState(), JobStatus.SUCCEEDED);
  assertEquals("Reservation for the job not released: Maps", 
      0, job.getNumReservedTaskTrackersForMaps());
  assertEquals("Reservation for the job not released : Reduces", 
      0, job.getNumReservedTaskTrackersForReduces());
  ClusterMetrics metrics = jobTracker.getClusterMetrics();
  assertEquals("reserved map slots do not match",
      0, metrics.getReservedMapSlots());
  assertEquals("reserved reduce slots do not match",
      0, metrics.getReservedReduceSlots());
}
项目:mapreduce-fork    文件:TestSetupTaskScheduling.java   
void addNewTaskStatus(FakeJobInProgress job, TaskType taskType,
      boolean useMapSlot, String tracker, List<TaskStatus> reports) 
      throws IOException {
  TaskAttemptID task = null;
  TaskStatus status = null;
  if (taskType == TaskType.MAP) {
    task = job.findMapTask(tracker);
    status = new MapTaskStatus(task, 0.01f, 2,
          TaskStatus.State.RUNNING, "", "", tracker,
          TaskStatus.Phase.MAP, new Counters());
  } else if (taskType == TaskType.TASK_CLEANUP) {
    if (useMapSlot) {
      status = job.maps[0].taskStatuses.get(
        new TaskAttemptID(job.maps[0].getTIPId(), 0));
    } else {
      status = job.reduces[0].taskStatuses.get(
            new TaskAttemptID(job.reduces[0].getTIPId(), 0));
    }
  } else {
    task = job.findReduceTask(tracker);
    status = new ReduceTaskStatus(task, 0.01f, 2,
          TaskStatus.State.RUNNING, "", "", tracker,
          TaskStatus.Phase.REDUCE, new Counters());
  }
  reports.add(status);
}
项目:mapreduce-fork    文件:TestSetupTaskScheduling.java   
/**
 * Test that a setup task can be run against a map slot
 * if it is free.
 * @throws IOException
 */
public void testSetupTaskReturnedForFreeMapSlots() throws IOException {
  // create a job with a setup task.
  FakeJobInProgress job = createJob(TaskType.JOB_SETUP);
  jobTracker.jobs.put(job.getJobID(), job);

  // create a status simulating a free tasktracker
  List<TaskStatus> reports = new ArrayList<TaskStatus>();
  TaskTrackerStatus ttStatus 
    = createTaskTrackerStatus(trackers[2], reports);

  // verify that a setup task can be assigned to a map slot.
  List<Task> tasks = jobTracker.getSetupAndCleanupTasks(ttStatus);
  assertEquals(1, tasks.size());
  assertTrue(tasks.get(0).isJobSetupTask());
  assertTrue(tasks.get(0).isMapTask());
  jobTracker.jobs.clear();
}
项目:mapreduce-fork    文件:TestSetupTaskScheduling.java   
/**
 * Test to check that map slots are counted when returning
 * a setup task.
 * @throws IOException
 */
public void testMapSlotsCountedForSetup() throws IOException {
  // create a job with a setup task.
  FakeJobInProgress job = createJob(TaskType.JOB_SETUP);
  jobTracker.jobs.put(job.getJobID(), job);

  // create another job for reservation
  FakeJobInProgress job1 = createJob(null);
  jobTracker.jobs.put(job1.getJobID(), job1);

  // create TT status for testing getSetupAndCleanupTasks
  List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>();
  addNewTaskStatus(job, TaskType.MAP, true, trackers[0], taskStatuses);
  TaskTrackerStatus ttStatus 
    = createTaskTrackerStatus(trackers[0], taskStatuses);

  // test that there should be no map setup task returned.
  List<Task> tasks = jobTracker.getSetupAndCleanupTasks(ttStatus);
  assertEquals(1, tasks.size());
  assertTrue(tasks.get(0).isJobSetupTask());
  assertFalse(tasks.get(0).isMapTask());
  jobTracker.jobs.clear();
}
项目:mapreduce-fork    文件:TestSetupTaskScheduling.java   
/**
 * Test to check that reduce slots are also counted when returning
 * a setup task.
 * @throws IOException
 */
public void testReduceSlotsCountedForSetup() throws IOException {
  // create a job with a setup task.
  FakeJobInProgress job = createJob(TaskType.JOB_SETUP);
  jobTracker.jobs.put(job.getJobID(), job);

  // create another job for reservation
  FakeJobInProgress job1 = createJob(null);
  jobTracker.jobs.put(job1.getJobID(), job1);

  // create TT status for testing getSetupAndCleanupTasks
  List<TaskStatus> reports = new ArrayList<TaskStatus>();
  // because free map slots are checked first in code,
  // we fill up map slots also.
  addNewTaskStatus(job1, TaskType.MAP, true, trackers[1], reports);
  addNewTaskStatus(job1, TaskType.REDUCE, false,trackers[1], reports);
  TaskTrackerStatus ttStatus 
    = createTaskTrackerStatus(trackers[1], reports);

  // test that there should be no setup task returned,
  // as both map and reduce slots are occupied.
  List<Task> tasks = jobTracker.getSetupAndCleanupTasks(ttStatus);
  assertNull(tasks);
  jobTracker.jobs.clear();
}
项目:mapreduce-fork    文件:TestSetupTaskScheduling.java   
/**
 * Test to check that map slots are counted when returning
 * a taskCleanup task.
 * @throws IOException
 */
public void testNumSlotsUsedForTaskCleanup() throws IOException {
  // Create a high RAM job with a map task's cleanup task and a reduce task's
  // cleanup task. Make this Fake job a high RAM job by setting the slots
  // required for map/reduce task to 2.
  FakeJobInProgress job = createJob(TaskType.TASK_CLEANUP);
  jobTracker.jobs.put(job.getJobID(), job);

  // create TT status for testing getSetupAndCleanupTasks
  List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>();
  TaskTrackerStatus ttStatus =
    createTaskTrackerStatus(trackers[0], taskStatuses);//create dummy status

  // validate mapTaskCleanup task
  validateNumSlotsUsedForTaskCleanup(ttStatus);

  // validate reduceTaskCleanup task
  validateNumSlotsUsedForTaskCleanup(ttStatus);

  jobTracker.jobs.clear();
}
项目:hadoop-EAR    文件:TestSpeculativeExecution.java   
public void testTaskToSpeculate() throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[6];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(5);
  conf.setNumReduceTasks(5);
  conf.setFloat(JobInProgress.SPECULATIVE_SLOWNODE_THRESHOLD, 100f);
  conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
  conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
  conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();
  //schedule maps
  taskAttemptID[0] = job.findReduceTask(trackers[0]);
  taskAttemptID[1] = job.findReduceTask(trackers[1]);
  taskAttemptID[2] = job.findReduceTask(trackers[2]);
  taskAttemptID[3] = job.findReduceTask(trackers[3]);
  taskAttemptID[4] = job.findReduceTask(trackers[3]);
  clock.advance(5000);
  job.finishTask(taskAttemptID[0]);
  clock.advance(1000);
  job.finishTask(taskAttemptID[1]);
  clock.advance(20000);
  clock.advanceBySpeculativeLag();
  job.refresh(clock.getTime());
  //we should get a speculative task now
  taskAttemptID[5] = job.findReduceTask(trackers[4]);
  assertEquals(taskAttemptID[5].getTaskID().getId(),2);
  clock.advance(5000);
  job.finishTask(taskAttemptID[5]);

  job.refresh(clock.getTime());
  taskAttemptID[5] = job.findReduceTask(trackers[4]);
  assertEquals(taskAttemptID[5].getTaskID().getId(),3);
}
项目:hadoop-EAR    文件:TestSpeculativeExecution.java   
public void testTaskLATEScheduling() throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[20];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(5);
  conf.setNumReduceTasks(0);
  conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
  conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
  conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();

  taskAttemptID[0] = job.findMapTask(trackers[0]);
  taskAttemptID[1] = job.findMapTask(trackers[1]);
  taskAttemptID[2] = job.findMapTask(trackers[2]);
  taskAttemptID[3] = job.findMapTask(trackers[3]);
  clock.advance(2000);
  job.finishTask(taskAttemptID[0]);
  job.finishTask(taskAttemptID[1]);
  job.finishTask(taskAttemptID[2]);
  clock.advance(250000);
  taskAttemptID[4] = job.findMapTask(trackers[3]);
  clock.advanceBySpeculativeLag();
  //by doing the above clock adjustments, we bring the progress rate of
  //taskID 3 lower than 4. For taskID 3, the rate is 85/317000
  //and for taskID 4, the rate is 20/65000. But when we ask for a spec task
  //now, we should get back taskID 4 (since that is expected to complete
  //later than taskID 3).
  job.refresh(clock.getTime());
  job.progressMade(taskAttemptID[3], 0.85f);
  job.progressMade(taskAttemptID[4], 0.20f);
  taskAttemptID[5] = job.findMapTask(trackers[4]);
  assertEquals(taskAttemptID[5].getTaskID().getId(),4);
}
项目:hadoop-EAR    文件:TestSpeculativeExecution.java   
public void testFastTaskScheduling() throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[2];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(2);
  conf.setNumReduceTasks(0);
  conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
  conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
  conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
  conf.setMapSpeculativeDuration(300L);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();

  // a really fast task #1
  taskAttemptID[0] = job.findMapTask(trackers[0]);
  clock.advance(2000);
  job.finishTask(taskAttemptID[0]);

  // task #2 is slow
  taskAttemptID[1] = job.findMapTask(trackers[1]);
  clock.advanceBySpeculativeLag();
  clock.advance(5000);
  // 65 secs have elapsed since task scheduling
  // set progress so that it will complete within
  // 300 seconds
  job.progressMade(taskAttemptID[1], 0.7f);

  // no new map task should be found
  job.refresh(clock.getTime());
  assertEquals(job.findMapTask(trackers[2]), null);
}
项目:hadoop-EAR    文件:TestSpeculativeExecution.java   
private int speculativeCap(int totalTasks, int numEarlyComplete, int slots)
throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[1500];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(totalTasks);
  conf.setNumReduceTasks(0);
  jobTracker.setNumSlots(slots);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();
  int i;
  for (i = 0; i < totalTasks; i++) {
    taskAttemptID[i] = job.findMapTask(trackers[0]);
  }
  clock.advance(5000);
  for (i = 0; i < numEarlyComplete; i++) {
    job.finishTask(taskAttemptID[i]);
  }

  clock.advanceBySpeculativeLag();

  for (i = numEarlyComplete; i < totalTasks; i++) {
    job.progressMade(taskAttemptID[i], 0.85f);
  }
  clock.advance(50000);
  for (i = 0; i < (totalTasks - numEarlyComplete); i++) {
    job.refresh(clock.getTime());
    taskAttemptID[i] = job.findMapTask(trackers[1]);
    clock.advance(2000);
    if (taskAttemptID[i] != null) {
      //add some good progress constantly for the different
      //task-attempts so that
      //the tasktracker doesn't get into the slow trackers category
      job.progressMade(taskAttemptID[i], 0.99f);
    } else {
      break;
    }
  }
  return i;
}
项目:hadoop-EAR    文件:TestSpeculativeExecution.java   
public void testSlowMapProgressingRate() throws IOException {
  clock.advance(1000);
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[6];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(3);
  conf.setNumReduceTasks(0);
  //use processing rate for speculation
  conf.setBoolean("mapreduce.job.speculative.using.processing.rate", true);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();

  //schedule maps
  taskAttemptID[0] = job.findMapTask(trackers[0]);
  taskAttemptID[1] = job.findMapTask(trackers[1]);
  taskAttemptID[2] = job.findMapTask(trackers[2]);

  clock.advance(1000);
  job.finishTask(taskAttemptID[0]);
  //if consider the progress rate, we should speculate task 1
  //but if consider the processing rate, which is map_input_bytes/time
  //then we should speculate task 2
  job.processingRate(taskAttemptID[1], Task.Counter.MAP_INPUT_BYTES,
      100000000, 0.1f, TaskStatus.Phase.MAP);
  job.processingRate(taskAttemptID[2], Task.Counter.MAP_INPUT_BYTES,
      1000, 0.5f, TaskStatus.Phase.MAP);
  clock.advanceBySpeculativeLag();
  //we should get a speculative task now
  job.refresh(clock.getTime());
  taskAttemptID[3] = job.findMapTask(trackers[0]);

  assertEquals(taskAttemptID[3].getTaskID().getId(),2);
}
项目:RDFS    文件:TestSpeculativeExecution.java   
public void testTaskToSpeculate() throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[6];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(5);
  conf.setNumReduceTasks(5);
  conf.setFloat(JobInProgress.SPECULATIVE_SLOWNODE_THRESHOLD, 100f);
  conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
  conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
  conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();
  //schedule maps
  taskAttemptID[0] = job.findReduceTask(trackers[0]);
  taskAttemptID[1] = job.findReduceTask(trackers[1]);
  taskAttemptID[2] = job.findReduceTask(trackers[2]);
  taskAttemptID[3] = job.findReduceTask(trackers[3]);
  taskAttemptID[4] = job.findReduceTask(trackers[3]);
  clock.advance(5000);
  job.finishTask(taskAttemptID[0]);
  clock.advance(1000);
  job.finishTask(taskAttemptID[1]);
  clock.advance(20000);
  clock.advanceBySpeculativeLag();
  job.refresh(clock.getTime());
  //we should get a speculative task now
  taskAttemptID[5] = job.findReduceTask(trackers[4]);
  assertEquals(taskAttemptID[5].getTaskID().getId(),2);
  clock.advance(5000);
  job.finishTask(taskAttemptID[5]);

  job.refresh(clock.getTime());
  taskAttemptID[5] = job.findReduceTask(trackers[4]);
  assertEquals(taskAttemptID[5].getTaskID().getId(),3);
}
项目:RDFS    文件:TestSpeculativeExecution.java   
public void testTaskLATEScheduling() throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[20];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(5);
  conf.setNumReduceTasks(0);
  conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
  conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
  conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();

  taskAttemptID[0] = job.findMapTask(trackers[0]);
  taskAttemptID[1] = job.findMapTask(trackers[1]);
  taskAttemptID[2] = job.findMapTask(trackers[2]);
  taskAttemptID[3] = job.findMapTask(trackers[3]);
  clock.advance(2000);
  job.finishTask(taskAttemptID[0]);
  job.finishTask(taskAttemptID[1]);
  job.finishTask(taskAttemptID[2]);
  clock.advance(250000);
  taskAttemptID[4] = job.findMapTask(trackers[3]);
  clock.advanceBySpeculativeLag();
  //by doing the above clock adjustments, we bring the progress rate of
  //taskID 3 lower than 4. For taskID 3, the rate is 85/317000
  //and for taskID 4, the rate is 20/65000. But when we ask for a spec task
  //now, we should get back taskID 4 (since that is expected to complete
  //later than taskID 3).
  job.refresh(clock.getTime());
  job.progressMade(taskAttemptID[3], 0.85f);
  job.progressMade(taskAttemptID[4], 0.20f);
  taskAttemptID[5] = job.findMapTask(trackers[4]);
  assertEquals(taskAttemptID[5].getTaskID().getId(),4);
}
项目:RDFS    文件:TestSpeculativeExecution.java   
public void testFastTaskScheduling() throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[2];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(2);
  conf.setNumReduceTasks(0);
  conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
  conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
  conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
  conf.setMapSpeculativeDuration(300L);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();

  // a really fast task #1
  taskAttemptID[0] = job.findMapTask(trackers[0]);
  clock.advance(2000);
  job.finishTask(taskAttemptID[0]);

  // task #2 is slow
  taskAttemptID[1] = job.findMapTask(trackers[1]);
  clock.advanceBySpeculativeLag();
  clock.advance(5000);
  // 65 secs have elapsed since task scheduling
  // set progress so that it will complete within
  // 300 seconds
  job.progressMade(taskAttemptID[1], 0.7f);

  // no new map task should be found
  job.refresh(clock.getTime());
  assertEquals(job.findMapTask(trackers[2]), null);
}
项目:RDFS    文件:TestSpeculativeExecution.java   
private int speculativeCap(int totalTasks, int numEarlyComplete, int slots)
throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[1500];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(totalTasks);
  conf.setNumReduceTasks(0);
  jobTracker.setNumSlots(slots);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();
  int i;
  for (i = 0; i < totalTasks; i++) {
    taskAttemptID[i] = job.findMapTask(trackers[0]);
  }
  clock.advance(5000);
  for (i = 0; i < numEarlyComplete; i++) {
    job.finishTask(taskAttemptID[i]);
  }

  clock.advanceBySpeculativeLag();

  for (i = numEarlyComplete; i < totalTasks; i++) {
    job.progressMade(taskAttemptID[i], 0.85f);
  }
  clock.advance(50000);
  for (i = 0; i < (totalTasks - numEarlyComplete); i++) {
    job.refresh(clock.getTime());
    taskAttemptID[i] = job.findMapTask(trackers[1]);
    clock.advance(2000);
    if (taskAttemptID[i] != null) {
      //add some good progress constantly for the different
      //task-attempts so that
      //the tasktracker doesn't get into the slow trackers category
      job.progressMade(taskAttemptID[i], 0.99f);
    } else {
      break;
    }
  }
  return i;
}
项目:RDFS    文件:TestSpeculativeExecution.java   
public void testSlowMapProgressingRate() throws IOException {
  clock.advance(1000);
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[6];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(3);
  conf.setNumReduceTasks(0);
  //use processing rate for speculation
  conf.setBoolean("mapreduce.job.speculative.using.processing.rate", true);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();

  //schedule maps
  taskAttemptID[0] = job.findMapTask(trackers[0]);
  taskAttemptID[1] = job.findMapTask(trackers[1]);
  taskAttemptID[2] = job.findMapTask(trackers[2]);

  clock.advance(1000);
  job.finishTask(taskAttemptID[0]);
  //if consider the progress rate, we should speculate task 1
  //but if consider the processing rate, which is map_input_bytes/time
  //then we should speculate task 2
  job.processingRate(taskAttemptID[1], Task.Counter.MAP_INPUT_BYTES,
      100000000, 0.1f, TaskStatus.Phase.MAP);
  job.processingRate(taskAttemptID[2], Task.Counter.MAP_INPUT_BYTES,
      1000, 0.5f, TaskStatus.Phase.MAP);
  clock.advanceBySpeculativeLag();
  //we should get a speculative task now
  job.refresh(clock.getTime());
  taskAttemptID[3] = job.findMapTask(trackers[0]);

  assertEquals(taskAttemptID[3].getTaskID().getId(),2);
}
项目:mapreduce-fork    文件:TestLostTracker.java   
public void testLostTracker() throws IOException {
  // Tracker 0 contacts JT
  FakeObjectUtilities.establishFirstContact(jobTracker, trackers[0]);

  TaskAttemptID[] tid = new TaskAttemptID[2];
  JobConf conf = new JobConf();
  conf.setNumMapTasks(1);
  conf.setNumReduceTasks(1);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();

  // Tracker 0 gets the map task
  tid[0] = job.findMapTask(trackers[0]);

  job.finishTask(tid[0]);

  // Advance clock. Tracker 0 would have got lost
  clock.advance(8 * 1000);

  jobTracker.checkExpiredTrackers();

  // Tracker 1 establishes contact with JT 
  FakeObjectUtilities.establishFirstContact(jobTracker, trackers[1]);

  // Tracker1 should get assigned the lost map task
  tid[1] =  job.findMapTask(trackers[1]);

  assertNotNull("Map Task from Lost Tracker did not get reassigned", tid[1]);

  assertEquals("Task ID of reassigned map task does not match",
      tid[0].getTaskID().toString(), tid[1].getTaskID().toString());

  job.finishTask(tid[1]);

}
项目:mapreduce-fork    文件:TestSpeculativeExecution.java   
public void testTaskToSpeculate() throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[6];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(5);
  conf.setNumReduceTasks(5);
  conf.setFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);    
  job.initTasks();
  //schedule maps
  taskAttemptID[0] = job.findReduceTask(trackers[0]);
  taskAttemptID[1] = job.findReduceTask(trackers[1]);
  taskAttemptID[2] = job.findReduceTask(trackers[2]);
  taskAttemptID[3] = job.findReduceTask(trackers[3]);
  taskAttemptID[4] = job.findReduceTask(trackers[3]);
  clock.advance(5000);
  job.finishTask(taskAttemptID[0]);
  clock.advance(1000);
  job.finishTask(taskAttemptID[1]);
  clock.advance(20000);
  clock.advanceBySpeculativeLag();
  //we should get a speculative task now
  taskAttemptID[5] = job.findReduceTask(trackers[4]);
  assertEquals(taskAttemptID[5].getTaskID().getId(),2);
  clock.advance(5000);
  job.finishTask(taskAttemptID[5]);

  taskAttemptID[5] = job.findReduceTask(trackers[4]);
  assertEquals(taskAttemptID[5].getTaskID().getId(),3);

  // Verify total speculative tasks by jobtracker instrumentation
  assertEquals("Total speculative maps", 1, fakeInst.numSpeculativeMaps);
  assertEquals("Total speculative reduces", 3,
               fakeInst.numSpeculativeReduces);
  LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps);
  LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces);
}
项目:mapreduce-fork    文件:TestSpeculativeExecution.java   
public void testTaskLATEScheduling() throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[20];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(5);
  conf.setNumReduceTasks(0);
  conf.setFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();

  taskAttemptID[0] = job.findMapTask(trackers[0]);
  taskAttemptID[1] = job.findMapTask(trackers[1]);
  taskAttemptID[2] = job.findMapTask(trackers[2]);
  taskAttemptID[3] = job.findMapTask(trackers[3]);
  clock.advance(2000);
  job.finishTask(taskAttemptID[0]);
  job.finishTask(taskAttemptID[1]);
  job.finishTask(taskAttemptID[2]);
  clock.advance(250000);
  taskAttemptID[4] = job.findMapTask(trackers[3]);
  clock.advanceBySpeculativeLag();
  //by doing the above clock adjustments, we bring the progress rate of 
  //taskID 3 lower than 4. For taskID 3, the rate is 85/317000
  //and for taskID 4, the rate is 20/65000. But when we ask for a spec task
  //now, we should get back taskID 4 (since that is expected to complete
  //later than taskID 3).
  job.progressMade(taskAttemptID[3], 0.85f);
  job.progressMade(taskAttemptID[4], 0.20f);
  taskAttemptID[5] = job.findMapTask(trackers[4]);
  assertEquals(taskAttemptID[5].getTaskID().getId(),4);
  // Verify total speculative tasks by jobtracker instrumentation
  assertEquals("Total speculative maps", 2, fakeInst.numSpeculativeMaps);
  assertEquals("Total speculative reduces", 3,
               fakeInst.numSpeculativeReduces);
  LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps);
  LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces);
}
项目:mapreduce-fork    文件:TestSpeculativeExecution.java   
private int speculativeCap(int totalTasks, int numEarlyComplete, int slots)
throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[1500];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(totalTasks);
  conf.setNumReduceTasks(0);
  jobTracker.setNumSlots(slots);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();
  int i;
  for (i = 0; i < totalTasks; i++) {
    taskAttemptID[i] = job.findMapTask(trackers[0]);
  }
  clock.advance(5000);
  for (i = 0; i < numEarlyComplete; i++) {
    job.finishTask(taskAttemptID[i]);
  }

  clock.advanceBySpeculativeLag();

  for (i = numEarlyComplete; i < totalTasks; i++) {
    job.progressMade(taskAttemptID[i], 0.85f);
  }
  clock.advance(50000);
  for (i = 0; i < (totalTasks - numEarlyComplete); i++) {
    taskAttemptID[i] = job.findMapTask(trackers[1]);
    clock.advance(2000);
    if (taskAttemptID[i] != null) {
      //add some good progress constantly for the different
      //task-attempts so that
      //the tasktracker doesn't get into the slow trackers category
      job.progressMade(taskAttemptID[i], 0.99f);
    } else {
      break;
    }
  }
  return i;
}
项目:hadoop-EAR    文件:TestSpeculativeExecution.java   
public void testRunningTaskCountWithSpeculation() throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[8];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(3);
  conf.setNumReduceTasks(3);
  conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
  conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
  conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();

  //Check for runningMap counts first
  //schedule maps
  taskAttemptID[0] = job.findMapTask(trackers[0]);
  taskAttemptID[1] = job.findMapTask(trackers[1]);
  taskAttemptID[2] = job.findMapTask(trackers[2]);

  clock.advance(5000);
  job.finishTask(taskAttemptID[0]);
  clock.advance(1000);
  job.finishTask(taskAttemptID[1]);
  clock.advanceBySpeculativeLag();

  //we should get a speculative task now
  job.refresh(clock.getTime());
  taskAttemptID[3] = job.findMapTask(trackers[3]);
  job.refresh(clock.getTime());
  int oldRunningMap = job.runningMaps();
  LOG.info("No of running maps before fail was " + oldRunningMap);
  job.failTask(taskAttemptID[2]);
  job.refresh(clock.getTime());
  assertEquals(
    "Running maps count should be updated from " + oldRunningMap + " to " +
      (oldRunningMap - 1), job.runningMaps(), oldRunningMap - 1);
  LOG.info(" Job running maps after fail " + job.runningMaps());

  clock.advance(5000);
  job.finishTask(taskAttemptID[3]);

  //check for runningReduce count.
  taskAttemptID[4] = job.findReduceTask(trackers[0]);
  taskAttemptID[5] = job.findReduceTask(trackers[1]);
  taskAttemptID[6] = job.findReduceTask(trackers[2]);

  clock.advance(5000);
  job.finishTask(taskAttemptID[4]);
  clock.advance(1000);
  job.finishTask(taskAttemptID[5]);

  job.refresh(clock.getTime());
  clock.advanceBySpeculativeLag();
  taskAttemptID[7] = job.findReduceTask(trackers[4]);

  job.refresh(clock.getTime());
  int oldRunningReduces = job.runningReduces();
  job.failTask(taskAttemptID[6]);
  job.refresh(clock.getTime());
  LOG.info(
    " No of running Reduces before fail " + oldRunningReduces);
  LOG.info(
    " No of runing reduces after fail " + job.runningReduces());
  assertEquals(
    "Running reduces count should be updated from " + oldRunningReduces +
      " to " + (oldRunningReduces - 1), job.runningReduces(),
    oldRunningReduces - 1);

  job.finishTask(taskAttemptID[7]);
}
项目:hadoop-EAR    文件:TestSpeculativeExecution.java   
public void testIsSlowTracker() throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[20];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(10);
  conf.setNumReduceTasks(0);
  conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
  conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();
  //schedule some tasks
  taskAttemptID[0] = job.findMapTask(trackers[0]);
  taskAttemptID[1] = job.findMapTask(trackers[0]);
  taskAttemptID[2] = job.findMapTask(trackers[0]);
  taskAttemptID[3] = job.findMapTask(trackers[1]);
  taskAttemptID[4] = job.findMapTask(trackers[1]);
  taskAttemptID[5] = job.findMapTask(trackers[1]);
  taskAttemptID[6] = job.findMapTask(trackers[2]);
  taskAttemptID[7] = job.findMapTask(trackers[2]);
  taskAttemptID[8] = job.findMapTask(trackers[2]);
  clock.advance(1000);
  //Some tasks finish in 1 second (on trackers[0])
  job.finishTask(taskAttemptID[0]);
  job.finishTask(taskAttemptID[1]);
  job.finishTask(taskAttemptID[2]);
  clock.advance(1000);
  //Some tasks finish in 2 second (on trackers[1])
  job.finishTask(taskAttemptID[3]);
  job.finishTask(taskAttemptID[4]);
  job.finishTask(taskAttemptID[5]);
  assertEquals("Tracker "+ trackers[0] + " expected to be not slow ",
      job.isSlowTracker(trackers[0]), false);
  clock.advance(100000);
  //After a long time, some tasks finished on trackers[2]
  job.finishTask(taskAttemptID[6]);
  job.finishTask(taskAttemptID[7]);
  job.finishTask(taskAttemptID[8]);
  job.refresh(clock.getTime());
  assertEquals("Tracker "+ trackers[2] + " expected to be slow ",
      job.isSlowTracker(trackers[2]), true);
}
项目:hadoop-EAR    文件:TestSpeculativeExecution.java   
/**
 * tests that a task that has a remaining time less than duration
 * time
 */
public void testTaskSpeculationStddevCap() throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[8];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setFloat(JobInProgress.SPECULATIVE_STDDEVMEANRATIO_MAX, 0.33f);
  conf.setNumMapTasks(7);
  conf.setNumReduceTasks(0);
  conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
  conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();

  // all but one tasks start off
  taskAttemptID[0] = job.findMapTask(trackers[0]);
  taskAttemptID[1] = job.findMapTask(trackers[1]);
  taskAttemptID[2] = job.findMapTask(trackers[2]);
  taskAttemptID[3] = job.findMapTask(trackers[0]);
  taskAttemptID[4] = job.findMapTask(trackers[1]);
  taskAttemptID[5] = job.findMapTask(trackers[2]);

  // 3 tasks finish really fast in 15s
  clock.advance (15 * 1000);
  job.finishTask(taskAttemptID[0]);
  job.finishTask(taskAttemptID[1]);
  job.finishTask(taskAttemptID[2]);

  // advance to 600s and schedule last mapper
  clock.advance (585 * 1000);
  taskAttemptID[6] = job.findMapTask(trackers[0]);

  // advance to 700s and report progress
  clock.advance (10 * 60 * 1000);

  // set progress rates
  job.progressMade(taskAttemptID[3], 0.2f);
  job.progressMade(taskAttemptID[4], 0.5f);
  job.progressMade(taskAttemptID[5], 0.6f);
  job.progressMade(taskAttemptID[6], 0.02f);

  // the progress has been set in such a way that
  // stddev > mean. now we depend on stddev capping
  // for speculation.

  job.refresh(clock.getTime());
  taskAttemptID[7] = job.findMapTask(trackers[1]);

  // no new map task should be found
  if(taskAttemptID[7] ==  null)
    Assert.fail();
}
项目:hadoop-EAR    文件:TestSpeculativeExecution.java   
public void testSpeculateLastTask() throws Exception {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[8];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(3);
  conf.setNumReduceTasks(3);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();

  taskAttemptID[0] = job.findMapTask(trackers[0]);
  taskAttemptID[1] = job.findMapTask(trackers[1]);
  taskAttemptID[2] = job.findMapTask(trackers[2]);

  clock.advanceBySpeculativeLag();
  job.finishTask(taskAttemptID[0]);
  job.finishTask(taskAttemptID[1]);

  // Speculative last unfinised task
  job.refresh(clock.getTime());
  taskAttemptID[3] = job.findMapTask(trackers[3]);
  Assert.assertNotNull(taskAttemptID[3]);

  job.finishTask(taskAttemptID[2]);
  job.finishTask(taskAttemptID[3]);

  taskAttemptID[4] = job.findReduceTask(trackers[0]);
  taskAttemptID[5] = job.findReduceTask(trackers[1]);
  taskAttemptID[6] = job.findReduceTask(trackers[2]);

  clock.advanceBySpeculativeLag();
  job.finishTask(taskAttemptID[4]);
  job.finishTask(taskAttemptID[5]);

  // Speculative last unfinised task
  job.refresh(clock.getTime());
  taskAttemptID[7] = job.findReduceTask(trackers[3]);
  Assert.assertNotNull(taskAttemptID[7]);

  job.finishTask(taskAttemptID[6]);
  job.finishTask(taskAttemptID[7]);
}
项目:hadoop-EAR    文件:TestSpeculativeExecution.java   
public void testSlowReduceProgressingRate() throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[6];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(4);
  conf.setNumReduceTasks(4);
  //use processing rate for speculation
  conf.setBoolean("mapreduce.job.speculative.using.processing.rate", true);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();

  //schedule reduces
  taskAttemptID[0] = job.findReduceTask(trackers[0]);
  taskAttemptID[1] = job.findReduceTask(trackers[1]);
  taskAttemptID[2] = job.findReduceTask(trackers[2]);
  taskAttemptID[3] = job.findReduceTask(trackers[3]);

  clock.advance(1000);

  //task 0 just starts copying, while task 1, 2, 3 are already in the reducing
  //phase. If we compared the progress rate, then we should speculate 0.
  //However, by comparing the processing rate in the copy phase, among all 4
  //tasks, task 0 is fast, and we should not speculate it.
  //for task 1, 2, 3, they are all in the reducing phase, with same progress,
  //however, task 1 has smaller processing rate(the statistics of the reduce
  //phase for all the tasks will also include statistics for task 0, whose
  //processing rate is 0)
  job.finishCopy(taskAttemptID[1], clock.getTime(), 10000);
  job.finishCopy(taskAttemptID[2], clock.getTime(), 10000);
  job.finishCopy(taskAttemptID[3], clock.getTime(), 10000);
  clock.advance(1000);
  job.finishSort(taskAttemptID[1], clock.getTime());
  job.finishSort(taskAttemptID[2], clock.getTime());
  job.finishSort(taskAttemptID[3], clock.getTime());
  job.processingRate(taskAttemptID[0], Task.Counter.REDUCE_SHUFFLE_BYTES,
      100000000, 0.1f, TaskStatus.Phase.SHUFFLE);
  job.processingRate(taskAttemptID[1], Task.Counter.REDUCE_INPUT_BYTES,
      1000, 0.8f, TaskStatus.Phase.REDUCE);
  job.processingRate(taskAttemptID[2], Task.Counter.REDUCE_INPUT_BYTES,
      100000000, 0.8f, TaskStatus.Phase.REDUCE);
  job.processingRate(taskAttemptID[3], Task.Counter.REDUCE_INPUT_BYTES,
      100000000, 0.8f, TaskStatus.Phase.REDUCE);
  clock.advanceBySpeculativeLag();
  //we should get a speculative task now
  job.refresh(clock.getTime());
  taskAttemptID[4] = job.findReduceTask(trackers[4]);
  assertEquals(taskAttemptID[4].getTaskID().getId(),1);
}
项目:RDFS    文件:TestSpeculativeExecution.java   
public void testRunningTaskCountWithSpeculation() throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[8];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(3);
  conf.setNumReduceTasks(3);
  conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
  conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
  conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();

  //Check for runningMap counts first
  //schedule maps
  taskAttemptID[0] = job.findMapTask(trackers[0]);
  taskAttemptID[1] = job.findMapTask(trackers[1]);
  taskAttemptID[2] = job.findMapTask(trackers[2]);

  clock.advance(5000);
  job.finishTask(taskAttemptID[0]);
  clock.advance(1000);
  job.finishTask(taskAttemptID[1]);
  clock.advanceBySpeculativeLag();

  //we should get a speculative task now
  job.refresh(clock.getTime());
  taskAttemptID[3] = job.findMapTask(trackers[3]);
  job.refresh(clock.getTime());
  int oldRunningMap = job.runningMaps();
  LOG.info("No of running maps before fail was " + oldRunningMap);
  job.failTask(taskAttemptID[2]);
  job.refresh(clock.getTime());
  assertEquals(
    "Running maps count should be updated from " + oldRunningMap + " to " +
      (oldRunningMap - 1), job.runningMaps(), oldRunningMap - 1);
  LOG.info(" Job running maps after fail " + job.runningMaps());

  clock.advance(5000);
  job.finishTask(taskAttemptID[3]);

  //check for runningReduce count.
  taskAttemptID[4] = job.findReduceTask(trackers[0]);
  taskAttemptID[5] = job.findReduceTask(trackers[1]);
  taskAttemptID[6] = job.findReduceTask(trackers[2]);

  clock.advance(5000);
  job.finishTask(taskAttemptID[4]);
  clock.advance(1000);
  job.finishTask(taskAttemptID[5]);

  job.refresh(clock.getTime());
  clock.advanceBySpeculativeLag();
  taskAttemptID[7] = job.findReduceTask(trackers[4]);

  job.refresh(clock.getTime());
  int oldRunningReduces = job.runningReduces();
  job.failTask(taskAttemptID[6]);
  job.refresh(clock.getTime());
  LOG.info(
    " No of running Reduces before fail " + oldRunningReduces);
  LOG.info(
    " No of runing reduces after fail " + job.runningReduces());
  assertEquals(
    "Running reduces count should be updated from " + oldRunningReduces +
      " to " + (oldRunningReduces - 1), job.runningReduces(),
    oldRunningReduces - 1);

  job.finishTask(taskAttemptID[7]);
}
项目:RDFS    文件:TestSpeculativeExecution.java   
public void testIsSlowTracker() throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[20];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(10);
  conf.setNumReduceTasks(0);
  conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
  conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();
  //schedule some tasks
  taskAttemptID[0] = job.findMapTask(trackers[0]);
  taskAttemptID[1] = job.findMapTask(trackers[0]);
  taskAttemptID[2] = job.findMapTask(trackers[0]);
  taskAttemptID[3] = job.findMapTask(trackers[1]);
  taskAttemptID[4] = job.findMapTask(trackers[1]);
  taskAttemptID[5] = job.findMapTask(trackers[1]);
  taskAttemptID[6] = job.findMapTask(trackers[2]);
  taskAttemptID[7] = job.findMapTask(trackers[2]);
  taskAttemptID[8] = job.findMapTask(trackers[2]);
  clock.advance(1000);
  //Some tasks finish in 1 second (on trackers[0])
  job.finishTask(taskAttemptID[0]);
  job.finishTask(taskAttemptID[1]);
  job.finishTask(taskAttemptID[2]);
  clock.advance(1000);
  //Some tasks finish in 2 second (on trackers[1])
  job.finishTask(taskAttemptID[3]);
  job.finishTask(taskAttemptID[4]);
  job.finishTask(taskAttemptID[5]);
  assertEquals("Tracker "+ trackers[0] + " expected to be not slow ",
      job.isSlowTracker(trackers[0]), false);
  clock.advance(100000);
  //After a long time, some tasks finished on trackers[2]
  job.finishTask(taskAttemptID[6]);
  job.finishTask(taskAttemptID[7]);
  job.finishTask(taskAttemptID[8]);
  job.refresh(clock.getTime());
  assertEquals("Tracker "+ trackers[2] + " expected to be slow ",
      job.isSlowTracker(trackers[2]), true);
}
项目:RDFS    文件:TestSpeculativeExecution.java   
/**
 * tests that a task that has a remaining time less than duration
 * time
 */
public void testTaskSpeculationStddevCap() throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[8];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setFloat(JobInProgress.SPECULATIVE_STDDEVMEANRATIO_MAX, 0.33f);
  conf.setNumMapTasks(7);
  conf.setNumReduceTasks(0);
  conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
  conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();

  // all but one tasks start off
  taskAttemptID[0] = job.findMapTask(trackers[0]);
  taskAttemptID[1] = job.findMapTask(trackers[1]);
  taskAttemptID[2] = job.findMapTask(trackers[2]);
  taskAttemptID[3] = job.findMapTask(trackers[0]);
  taskAttemptID[4] = job.findMapTask(trackers[1]);
  taskAttemptID[5] = job.findMapTask(trackers[2]);

  // 3 tasks finish really fast in 15s
  clock.advance (15 * 1000);
  job.finishTask(taskAttemptID[0]);
  job.finishTask(taskAttemptID[1]);
  job.finishTask(taskAttemptID[2]);

  // advance to 600s and schedule last mapper
  clock.advance (585 * 1000);
  taskAttemptID[6] = job.findMapTask(trackers[0]);

  // advance to 700s and report progress
  clock.advance (10 * 60 * 1000);

  // set progress rates
  job.progressMade(taskAttemptID[3], 0.2f);
  job.progressMade(taskAttemptID[4], 0.5f);
  job.progressMade(taskAttemptID[5], 0.6f);
  job.progressMade(taskAttemptID[6], 0.02f);

  // the progress has been set in such a way that
  // stddev > mean. now we depend on stddev capping
  // for speculation.

  job.refresh(clock.getTime());
  taskAttemptID[7] = job.findMapTask(trackers[1]);

  // no new map task should be found
  if(taskAttemptID[7] ==  null)
    Assert.fail();
}
项目:RDFS    文件:TestSpeculativeExecution.java   
public void testSpeculateLastTask() throws Exception {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[8];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(3);
  conf.setNumReduceTasks(3);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();

  taskAttemptID[0] = job.findMapTask(trackers[0]);
  taskAttemptID[1] = job.findMapTask(trackers[1]);
  taskAttemptID[2] = job.findMapTask(trackers[2]);

  clock.advanceBySpeculativeLag();
  job.finishTask(taskAttemptID[0]);
  job.finishTask(taskAttemptID[1]);

  // Speculative last unfinised task
  job.refresh(clock.getTime());
  taskAttemptID[3] = job.findMapTask(trackers[3]);
  Assert.assertNotNull(taskAttemptID[3]);

  job.finishTask(taskAttemptID[2]);
  job.finishTask(taskAttemptID[3]);

  taskAttemptID[4] = job.findReduceTask(trackers[0]);
  taskAttemptID[5] = job.findReduceTask(trackers[1]);
  taskAttemptID[6] = job.findReduceTask(trackers[2]);

  clock.advanceBySpeculativeLag();
  job.finishTask(taskAttemptID[4]);
  job.finishTask(taskAttemptID[5]);

  // Speculative last unfinised task
  job.refresh(clock.getTime());
  taskAttemptID[7] = job.findReduceTask(trackers[3]);
  Assert.assertNotNull(taskAttemptID[7]);

  job.finishTask(taskAttemptID[6]);
  job.finishTask(taskAttemptID[7]);
}
项目:RDFS    文件:TestSpeculativeExecution.java   
public void testSlowReduceProgressingRate() throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[6];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(4);
  conf.setNumReduceTasks(4);
  //use processing rate for speculation
  conf.setBoolean("mapreduce.job.speculative.using.processing.rate", true);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();

  //schedule reduces
  taskAttemptID[0] = job.findReduceTask(trackers[0]);
  taskAttemptID[1] = job.findReduceTask(trackers[1]);
  taskAttemptID[2] = job.findReduceTask(trackers[2]);
  taskAttemptID[3] = job.findReduceTask(trackers[3]);

  clock.advance(1000);

  //task 0 just starts copying, while task 1, 2, 3 are already in the reducing
  //phase. If we compared the progress rate, then we should speculate 0.
  //However, by comparing the processing rate in the copy phase, among all 4
  //tasks, task 0 is fast, and we should not speculate it.
  //for task 1, 2, 3, they are all in the reducing phase, with same progress,
  //however, task 1 has smaller processing rate(the statistics of the reduce
  //phase for all the tasks will also include statistics for task 0, whose
  //processing rate is 0)
  job.finishCopy(taskAttemptID[1], clock.getTime(), 10000);
  job.finishCopy(taskAttemptID[2], clock.getTime(), 10000);
  job.finishCopy(taskAttemptID[3], clock.getTime(), 10000);
  clock.advance(1000);
  job.finishSort(taskAttemptID[1], clock.getTime());
  job.finishSort(taskAttemptID[2], clock.getTime());
  job.finishSort(taskAttemptID[3], clock.getTime());
  job.processingRate(taskAttemptID[0], Task.Counter.REDUCE_SHUFFLE_BYTES,
      100000000, 0.1f, TaskStatus.Phase.SHUFFLE);
  job.processingRate(taskAttemptID[1], Task.Counter.REDUCE_INPUT_BYTES,
      1000, 0.8f, TaskStatus.Phase.REDUCE);
  job.processingRate(taskAttemptID[2], Task.Counter.REDUCE_INPUT_BYTES,
      100000000, 0.8f, TaskStatus.Phase.REDUCE);
  job.processingRate(taskAttemptID[3], Task.Counter.REDUCE_INPUT_BYTES,
      100000000, 0.8f, TaskStatus.Phase.REDUCE);
  clock.advanceBySpeculativeLag();
  //we should get a speculative task now
  job.refresh(clock.getTime());
  taskAttemptID[4] = job.findReduceTask(trackers[4]);
  assertEquals(taskAttemptID[4].getTaskID().getId(),1);
}
项目:mapreduce-fork    文件:TestTrackerReservation.java   
/**
 * Test case to test if task tracker reservation.
 * <ol>
 * <li>Run a cluster with 3 trackers.</li>
 * <li>Submit a job which reserves all the slots in two
 * trackers.</li>
 * <li>Run the job on another tracker which has 
 * no reservations</li>
 * <li>Finish the job and observe the reservations are
 * successfully canceled</li>
 * </ol>
 * 
 * @throws Exception
 */
public void testTaskTrackerReservation() throws Exception {
  JobConf conf = new JobConf();

  conf.setNumMapTasks(1);
  conf.setNumReduceTasks(1);
  conf.setSpeculativeExecution(false);

  conf.setBoolean(JobContext.SETUP_CLEANUP_NEEDED, false);

  //Set task tracker objects for reservation.
  TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
  TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
  TaskTracker tt3 = jobTracker.getTaskTracker(trackers[2]);
  TaskTrackerStatus status1 = new TaskTrackerStatus(
      trackers[0],JobInProgress.convertTrackerNameToHostName(
          trackers[0]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
  TaskTrackerStatus status2 = new TaskTrackerStatus(
      trackers[1],JobInProgress.convertTrackerNameToHostName(
          trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
  TaskTrackerStatus status3 = new TaskTrackerStatus(
      trackers[1],JobInProgress.convertTrackerNameToHostName(
          trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
  tt1.setStatus(status1);
  tt2.setStatus(status2);
  tt3.setStatus(status3);

  FakeJobInProgress fjob = new FakeJobInProgress(conf, jobTracker);
  fjob.setClusterSize(3);
  fjob.initTasks();

  tt1.reserveSlots(TaskType.MAP, fjob, 2);
  tt1.reserveSlots(TaskType.REDUCE, fjob, 2);
  tt3.reserveSlots(TaskType.MAP, fjob, 2);
  tt3.reserveSlots(TaskType.REDUCE, fjob, 2);

  assertEquals("Trackers not reserved for the job : maps", 
      2, fjob.getNumReservedTaskTrackersForMaps());
  assertEquals("Trackers not reserved for the job : reduces", 
      2, fjob.getNumReservedTaskTrackersForReduces());
  ClusterMetrics metrics = jobTracker.getClusterMetrics();
  assertEquals("reserved map slots do not match",
        4, metrics.getReservedMapSlots());
  assertEquals("reserved reduce slots do not match",
        4, metrics.getReservedReduceSlots());

  TaskAttemptID mTid = fjob.findMapTask(trackers[1]);
  TaskAttemptID rTid = fjob.findReduceTask(trackers[1]);

  fjob.finishTask(mTid);
  fjob.finishTask(rTid);

  assertEquals("Job didnt complete successfully complete", fjob.getStatus()
      .getRunState(), JobStatus.SUCCEEDED);

  assertEquals("Reservation for the job not released: Maps", 
      0, fjob.getNumReservedTaskTrackersForMaps());
  assertEquals("Reservation for the job not released : Reduces", 
      0, fjob.getNumReservedTaskTrackersForReduces());
  metrics = jobTracker.getClusterMetrics();
  assertEquals("reserved map slots do not match",
      0, metrics.getReservedMapSlots());
  assertEquals("reserved reduce slots do not match",
      0, metrics.getReservedReduceSlots());
}
项目:mapreduce-fork    文件:TestLostTracker.java   
/**
 * Test whether the tracker gets blacklisted after its lost.
 */
public void testLostTrackerBeforeBlacklisting() throws Exception {
  FakeObjectUtilities.establishFirstContact(jobTracker, trackers[0]);
  TaskAttemptID[] tid = new TaskAttemptID[3];
  JobConf conf = new JobConf();
  conf.setNumMapTasks(1);
  conf.setNumReduceTasks(1);
  conf.set(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, "1");
  conf.set(MRJobConfig.SETUP_CLEANUP_NEEDED, "false");
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();
  job.setClusterSize(4);

  // Tracker 0 gets the map task
  tid[0] = job.findMapTask(trackers[0]);

  job.finishTask(tid[0]);

  // validate the total tracker count
  assertEquals("Active tracker count mismatch", 
               1, jobTracker.getClusterStatus(false).getTaskTrackers());

  // lose the tracker
  clock.advance(1100);
  jobTracker.checkExpiredTrackers();
  assertFalse("Tracker 0 not lost", 
      jobTracker.getClusterStatus(false).getActiveTrackerNames()
                .contains(trackers[0]));

  // validate the total tracker count
  assertEquals("Active tracker count mismatch", 
               0, jobTracker.getClusterStatus(false).getTaskTrackers());

  // Tracker 1 establishes contact with JT 
  FakeObjectUtilities.establishFirstContact(jobTracker, trackers[1]);

  // Tracker1 should get assigned the lost map task
  tid[1] =  job.findMapTask(trackers[1]);

  assertNotNull("Map Task from Lost Tracker did not get reassigned", tid[1]);

  assertEquals("Task ID of reassigned map task does not match",
      tid[0].getTaskID().toString(), tid[1].getTaskID().toString());

  // finish the map task
  job.finishTask(tid[1]);

  // finish the reduce task
  tid[2] =  job.findReduceTask(trackers[1]);
  job.finishTask(tid[2]);

  // check if job is successful
  assertEquals("Job not successful", 
               JobStatus.SUCCEEDED, job.getStatus().getRunState());

  // check if the tracker is lost
  // validate the total tracker count
  assertEquals("Active tracker count mismatch", 
               1, jobTracker.getClusterStatus(false).getTaskTrackers());
  // validate blacklisted count .. since we lost one blacklisted tracker
  assertEquals("Blacklisted tracker count mismatch", 
              0, jobTracker.getClusterStatus(false).getBlacklistedTrackers());
}
项目:mapreduce-fork    文件:TestLostTracker.java   
/**
 * Test whether the tracker gets lost after its blacklisted.
 */
public void testLostTrackerAfterBlacklisting() throws Exception {
  FakeObjectUtilities.establishFirstContact(jobTracker, trackers[0]);
  clock.advance(600);
  TaskAttemptID[] tid = new TaskAttemptID[2];
  JobConf conf = new JobConf();
  conf.setNumMapTasks(1);
  conf.setNumReduceTasks(0);
  conf.set(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, "1");
  conf.set(MRJobConfig.SETUP_CLEANUP_NEEDED, "false");
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();
  job.setClusterSize(4);

  // check if the tracker count is correct
  assertEquals("Active tracker count mismatch", 
               1, jobTracker.taskTrackers().size());

  // Tracker 0 gets the map task
  tid[0] = job.findMapTask(trackers[0]);
  // Fail the task
  job.failTask(tid[0]);

  // Tracker 1 establishes contact with JT
  FakeObjectUtilities.establishFirstContact(jobTracker, trackers[1]);
  // check if the tracker count is correct
  assertEquals("Active tracker count mismatch", 
               2, jobTracker.taskTrackers().size());

  // Tracker 1 gets the map task
  tid[1] = job.findMapTask(trackers[1]);
  // Finish the task and also the job
  job.finishTask(tid[1]);

  // check if job is successful
  assertEquals("Job not successful", 
               JobStatus.SUCCEEDED, job.getStatus().getRunState());

  // check if the trackers 1 got blacklisted
  assertTrue("Tracker 0 not blacklisted", 
             jobTracker.getBlacklistedTrackers()[0].getTaskTrackerName()
               .equals(trackers[0]));
  // check if the tracker count is correct
  assertEquals("Active tracker count mismatch", 
               2, jobTracker.taskTrackers().size());
  // validate blacklisted count
  assertEquals("Blacklisted tracker count mismatch", 
              1, jobTracker.getClusterStatus(false).getBlacklistedTrackers());

  // Advance clock. Tracker 0 should be lost
  clock.advance(500);
  jobTracker.checkExpiredTrackers();

  // check if the task tracker is lost
  assertFalse("Tracker 0 not lost", 
          jobTracker.getClusterStatus(false).getActiveTrackerNames()
                    .contains(trackers[0]));

  // check if the lost tracker has removed from the jobtracker
  assertEquals("Active tracker count mismatch", 
               1, jobTracker.taskTrackers().size());
  // validate blacklisted count
  assertEquals("Blacklisted tracker count mismatch", 
              0, jobTracker.getClusterStatus(false).getBlacklistedTrackers());

}
项目:mapreduce-fork    文件:TestSpeculativeExecution.java   
public void testRunningTaskCountWithSpeculation() throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[8];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(3);
  conf.setNumReduceTasks(3);
  conf.setFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
  job.initTasks();

  //Check for runningMap counts first
  //schedule maps
  taskAttemptID[0] = job.findMapTask(trackers[0]);
  taskAttemptID[1] = job.findMapTask(trackers[1]);
  taskAttemptID[2] = job.findMapTask(trackers[2]);

  clock.advance(5000);
  job.finishTask(taskAttemptID[0]);
  clock.advance(1000);
  job.finishTask(taskAttemptID[1]);
  clock.advanceBySpeculativeLag();

  //we should get a speculative task now
  taskAttemptID[3] = job.findMapTask(trackers[3]);
  int oldRunningMap = job.runningMaps();
  LOG.info("No of running maps before fail was " + oldRunningMap);
  job.failTask(taskAttemptID[2]);
  assertEquals(
    "Running maps count should be updated from " + oldRunningMap + " to " +
      (oldRunningMap - 1), job.runningMaps(), oldRunningMap - 1);
  LOG.info(" Job running maps after fail " + job.runningMaps());

  clock.advance(5000);
  job.finishTask(taskAttemptID[3]);

  //check for runningReduce count.
  taskAttemptID[4] = job.findReduceTask(trackers[0]);
  taskAttemptID[5] = job.findReduceTask(trackers[1]);
  taskAttemptID[6] = job.findReduceTask(trackers[2]);

  clock.advance(5000);
  job.finishTask(taskAttemptID[4]);
  clock.advance(1000);
  job.finishTask(taskAttemptID[5]);

  clock.advanceBySpeculativeLag();
  taskAttemptID[7] = job.findReduceTask(trackers[4]);

  int oldRunningReduces = job.runningReduces();
  job.failTask(taskAttemptID[6]);
  LOG.info(
    " No of running Reduces before fail " + oldRunningReduces);
  LOG.info(
    " No of runing reduces after fail " + job.runningReduces());
  assertEquals(
    "Running reduces count should be updated from " + oldRunningReduces +
      " to " + (oldRunningReduces - 1), job.runningReduces(),
    oldRunningReduces - 1);
  // Verify total speculative tasks by jobtracker instrumentation
  assertEquals("Total speculative maps", 1, fakeInst.numSpeculativeMaps);
  assertEquals("Total speculative reduces", 1,
               fakeInst.numSpeculativeReduces);
  LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps);
  LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces);

  job.finishTask(taskAttemptID[7]);
}
项目:mapreduce-fork    文件:TestSpeculativeExecution.java   
public void testIsSlowTracker() throws IOException {
  TaskAttemptID[] taskAttemptID = new TaskAttemptID[20];
  JobConf conf = new JobConf();
  conf.setSpeculativeExecution(true);
  conf.setNumMapTasks(10);
  conf.setNumReduceTasks(0);
  FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);    
  job.initTasks();
  //schedule some tasks
  taskAttemptID[0] = job.findMapTask(trackers[0]);
  taskAttemptID[1] = job.findMapTask(trackers[0]);
  taskAttemptID[2] = job.findMapTask(trackers[0]);
  taskAttemptID[3] = job.findMapTask(trackers[1]);
  taskAttemptID[4] = job.findMapTask(trackers[1]);
  taskAttemptID[5] = job.findMapTask(trackers[1]);
  taskAttemptID[6] = job.findMapTask(trackers[2]);
  taskAttemptID[7] = job.findMapTask(trackers[2]);
  taskAttemptID[8] = job.findMapTask(trackers[2]);
  clock.advance(1000);
  //Some tasks finish in 1 second (on trackers[0])
  job.finishTask(taskAttemptID[0]);
  job.finishTask(taskAttemptID[1]);
  job.finishTask(taskAttemptID[2]);
  clock.advance(1000);
  //Some tasks finish in 2 second (on trackers[1])
  job.finishTask(taskAttemptID[3]);
  job.finishTask(taskAttemptID[4]);
  job.finishTask(taskAttemptID[5]);
  assertEquals("Tracker "+ trackers[0] + " expected to be not slow ",
      job.isSlowTracker(trackers[0]), false);
  clock.advance(100000);
  //After a long time, some tasks finished on trackers[2]
  job.finishTask(taskAttemptID[6]);
  job.finishTask(taskAttemptID[7]);
  job.finishTask(taskAttemptID[8]);
  assertEquals("Tracker "+ trackers[2] + " expected to be slow ",
      job.isSlowTracker(trackers[2]), true);
  // Verify total speculative tasks by jobtracker instrumentation
  assertEquals("Total speculative maps", 1, fakeInst.numSpeculativeMaps);
  assertEquals("Total speculative reduces", 1,
               fakeInst.numSpeculativeReduces);
  LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps);
  LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces);
}
项目:mapreduce-fork    文件:TestClusterStatus.java   
public void testReservedSlots() throws Exception {
  Configuration conf = mr.createJobConf();
  conf.setInt(JobContext.NUM_MAPS, 1);

  Job job = Job.getInstance(cluster, conf);
  job.setNumReduceTasks(1);
  job.setSpeculativeExecution(false);
  job.setJobSetupCleanupNeeded(false);

  //Set task tracker objects for reservation.
  TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
  TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
  TaskTrackerStatus status1 = new TaskTrackerStatus(
      trackers[0],JobInProgress.convertTrackerNameToHostName(
          trackers[0]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
  TaskTrackerStatus status2 = new TaskTrackerStatus(
      trackers[1],JobInProgress.convertTrackerNameToHostName(
          trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
  tt1.setStatus(status1);
  tt2.setStatus(status2);

  fakeJob = new FakeJobInProgress(new JobConf(job.getConfiguration()),
                  jobTracker);
  fakeJob.setClusterSize(3);
  fakeJob.initTasks();

  FakeObjectUtilities.sendHeartBeat(jobTracker, status1, false,
    true, trackers[0], responseId);
  FakeObjectUtilities.sendHeartBeat(jobTracker, status2, false,
    true, trackers[1], responseId);
  responseId++; 
  ClusterMetrics metrics = cluster.getClusterStatus();
  assertEquals("reserved map slots do not match", 
    2, metrics.getReservedMapSlots());
  assertEquals("reserved reduce slots do not match", 
    2, metrics.getReservedReduceSlots());

  // redo to test re-reservations.
  FakeObjectUtilities.sendHeartBeat(jobTracker, status1, false,
      true, trackers[0], responseId);
  FakeObjectUtilities.sendHeartBeat(jobTracker, status2, false,
      true, trackers[1], responseId);
  responseId++; 
  metrics = cluster.getClusterStatus();
  assertEquals("reserved map slots do not match", 
      4, metrics.getReservedMapSlots());
  assertEquals("reserved reduce slots do not match", 
      4, metrics.getReservedReduceSlots());

  TaskAttemptID mTid = fakeJob.findMapTask(trackers[1]);
  TaskAttemptID rTid = fakeJob.findReduceTask(trackers[1]);

  fakeJob.finishTask(mTid);
  fakeJob.finishTask(rTid);

  assertEquals("Job didnt complete successfully complete", 
    fakeJob.getStatus().getRunState(), JobStatus.SUCCEEDED);
  metrics = cluster.getClusterStatus();
  assertEquals("reserved map slots do not match", 
    0, metrics.getReservedMapSlots());
  assertEquals("reserved reduce slots do not match", 
    0, metrics.getReservedReduceSlots());
}