Java 类org.apache.hadoop.mapred.MapTaskAttemptImpl 实例源码

项目:hadoop    文件:MapTaskImpl.java   
@Override
protected TaskAttemptImpl createAttempt() {
  return new MapTaskAttemptImpl(getID(), nextAttemptNumber,
      eventHandler, jobFile,
      partition, taskSplitMetaInfo, conf, taskAttemptListener,
      jobToken, credentials, clock, appContext);
}
项目:hadoop    文件:TestTaskAttempt.java   
private TaskAttemptImpl createMapTaskAttemptImplForTest(
    EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
  ApplicationId appId = ApplicationId.newInstance(1, 1);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  Path jobFile = mock(Path.class);
  JobConf jobConf = new JobConf();
  TaskAttemptImpl taImpl =
      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
          taskSplitMetaInfo, jobConf, taListener, null,
          null, clock, null);
  return taImpl;
}
项目:aliyun-oss-hadoop-fs    文件:MapTaskImpl.java   
@Override
protected TaskAttemptImpl createAttempt() {
  return new MapTaskAttemptImpl(getID(), nextAttemptNumber,
      eventHandler, jobFile,
      partition, taskSplitMetaInfo, conf, taskAttemptListener,
      jobToken, credentials, clock, appContext);
}
项目:aliyun-oss-hadoop-fs    文件:TestTaskAttempt.java   
private TaskAttemptImpl createMapTaskAttemptImplForTest(
    EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
  ApplicationId appId = ApplicationId.newInstance(1, 1);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  Path jobFile = mock(Path.class);
  JobConf jobConf = new JobConf();
  TaskAttemptImpl taImpl =
      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
          taskSplitMetaInfo, jobConf, taListener, null,
          null, clock, null);
  return taImpl;
}
项目:big-c    文件:MapTaskImpl.java   
@Override
protected TaskAttemptImpl createAttempt() {
  return new MapTaskAttemptImpl(getID(), nextAttemptNumber,
      eventHandler, jobFile,
      partition, taskSplitMetaInfo, conf, taskAttemptListener,
      jobToken, credentials, clock, appContext);
}
项目:big-c    文件:TestTaskAttempt.java   
private TaskAttemptImpl createMapTaskAttemptImplForTest(
    EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
  ApplicationId appId = ApplicationId.newInstance(1, 1);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  Path jobFile = mock(Path.class);
  JobConf jobConf = new JobConf();
  TaskAttemptImpl taImpl =
      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
          taskSplitMetaInfo, jobConf, taListener, null,
          null, clock, null);
  return taImpl;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:MapTaskImpl.java   
@Override
protected TaskAttemptImpl createAttempt() {
  return new MapTaskAttemptImpl(getID(), nextAttemptNumber,
      eventHandler, jobFile,
      partition, taskSplitMetaInfo, conf, taskAttemptListener,
      jobToken, credentials, clock, appContext);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestTaskAttempt.java   
private TaskAttemptImpl createMapTaskAttemptImplForTest(
    EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
  ApplicationId appId = ApplicationId.newInstance(1, 1);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  Path jobFile = mock(Path.class);
  JobConf jobConf = new JobConf();
  TaskAttemptImpl taImpl =
      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
          taskSplitMetaInfo, jobConf, taListener, null,
          null, clock, null);
  return taImpl;
}
项目:hadoop-plus    文件:MapTaskImpl.java   
@Override
protected TaskAttemptImpl createAttempt() {
  return new MapTaskAttemptImpl(getID(), nextAttemptNumber,
      eventHandler, jobFile,
      partition, taskSplitMetaInfo, conf, taskAttemptListener,
      jobToken, credentials, clock, appContext);
}
项目:hadoop-plus    文件:TestTaskAttempt.java   
private TaskAttemptImpl createMapTaskAttemptImplForTest(
    EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
  ApplicationId appId = ApplicationId.newInstance(1, 1);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  Path jobFile = mock(Path.class);
  JobConf jobConf = new JobConf();
  TaskAttemptImpl taImpl =
      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
          taskSplitMetaInfo, jobConf, taListener, null,
          null, clock, null);
  return taImpl;
}
项目:FlexMap    文件:MapTaskImpl.java   
@Override
protected TaskAttemptImpl createAttempt() {

  return new MapTaskAttemptImpl(getID(), nextAttemptNumber,
      eventHandler, jobFile,partition,taskSplitMetaInfo,conf,taskAttemptListener,
      jobToken, credentials, clock, appContext);
}
项目:FlexMap    文件:TestTaskAttempt.java   
private TaskAttemptImpl createMapTaskAttemptImplForTest(
    EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
  ApplicationId appId = ApplicationId.newInstance(1, 1);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  Path jobFile = mock(Path.class);
  JobConf jobConf = new JobConf();
  TaskAttemptImpl taImpl =
      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
          taskSplitMetaInfo, jobConf, taListener, null,
          null, clock, null);
  return taImpl;
}
项目:hops    文件:MapTaskImpl.java   
@Override
protected TaskAttemptImpl createAttempt() {
  return new MapTaskAttemptImpl(getID(), nextAttemptNumber,
      eventHandler, jobFile,
      partition, taskSplitMetaInfo, conf, taskAttemptListener,
      jobToken, credentials, clock, appContext);
}
项目:hops    文件:TestTaskAttempt.java   
private TaskAttemptImpl createMapTaskAttemptImplForTest(
    EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
  ApplicationId appId = ApplicationId.newInstance(1, 1);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  Path jobFile = mock(Path.class);
  JobConf jobConf = new JobConf();
  TaskAttemptImpl taImpl =
      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
          taskSplitMetaInfo, jobConf, taListener, null,
          null, clock, null);
  return taImpl;
}
项目:hops    文件:TestTaskAttempt.java   
private void containerKillBeforeAssignment(boolean scheduleAttempt)
    throws Exception {
  MockEventHandler eventHandler = new MockEventHandler();
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);

  TaskAttemptImpl taImpl =
      new MapTaskAttemptImpl(taskId, 1, eventHandler, mock(Path.class), 1,
          mock(TaskSplitMetaInfo.class), new JobConf(),
          mock(TaskAttemptListener.class), mock(Token.class),
          new Credentials(), new SystemClock(),
          mock(AppContext.class));
  if (scheduleAttempt) {
    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
        TaskAttemptEventType.TA_SCHEDULE));
  }
  taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(),"", true));
  assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
      TaskAttemptState.KILLED);
  assertEquals("Task attempt's internal state is not KILLED",
      taImpl.getInternalState(), TaskAttemptStateInternal.KILLED);
  assertFalse("InternalError occurred", eventHandler.internalError);
  TaskEvent event = eventHandler.lastTaskEvent;
  assertEquals(TaskEventType.T_ATTEMPT_KILLED, event.getType());
  // In NEW state, new map attempt should not be rescheduled.
  assertFalse(((TaskTAttemptKilledEvent)event).getRescheduleAttempt());
}
项目:hadoop-TCP    文件:MapTaskImpl.java   
@Override
protected TaskAttemptImpl createAttempt() {
  return new MapTaskAttemptImpl(getID(), nextAttemptNumber,
      eventHandler, jobFile,
      partition, taskSplitMetaInfo, conf, taskAttemptListener,
      jobToken, credentials, clock, appContext);
}
项目:hadoop-TCP    文件:TestTaskAttempt.java   
private TaskAttemptImpl createMapTaskAttemptImplForTest(
    EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
  ApplicationId appId = ApplicationId.newInstance(1, 1);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  Path jobFile = mock(Path.class);
  JobConf jobConf = new JobConf();
  TaskAttemptImpl taImpl =
      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
          taskSplitMetaInfo, jobConf, taListener, null,
          null, clock, null);
  return taImpl;
}
项目:hardfs    文件:MapTaskImpl.java   
@Override
protected TaskAttemptImpl createAttempt() {
  return new MapTaskAttemptImpl(getID(), nextAttemptNumber,
      eventHandler, jobFile,
      partition, taskSplitMetaInfo, conf, taskAttemptListener,
      jobToken, credentials, clock, appContext);
}
项目:hardfs    文件:TestTaskAttempt.java   
private TaskAttemptImpl createMapTaskAttemptImplForTest(
    EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
  ApplicationId appId = ApplicationId.newInstance(1, 1);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  Path jobFile = mock(Path.class);
  JobConf jobConf = new JobConf();
  TaskAttemptImpl taImpl =
      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
          taskSplitMetaInfo, jobConf, taListener, null,
          null, clock, null);
  return taImpl;
}
项目:hadoop-on-lustre2    文件:MapTaskImpl.java   
@Override
protected TaskAttemptImpl createAttempt() {
  return new MapTaskAttemptImpl(getID(), nextAttemptNumber,
      eventHandler, jobFile,
      partition, taskSplitMetaInfo, conf, taskAttemptListener,
      jobToken, credentials, clock, appContext);
}
项目:hadoop-on-lustre2    文件:TestTaskAttempt.java   
private TaskAttemptImpl createMapTaskAttemptImplForTest(
    EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
  ApplicationId appId = ApplicationId.newInstance(1, 1);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  Path jobFile = mock(Path.class);
  JobConf jobConf = new JobConf();
  TaskAttemptImpl taImpl =
      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
          taskSplitMetaInfo, jobConf, taListener, null,
          null, clock, null);
  return taImpl;
}
项目:hadoop    文件:TestTaskAttempt.java   
@Test
public void testLaunchFailedWhileKilling() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), null);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_KILL));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
  assertFalse(eventHandler.internalError);
  assertEquals("Task attempt is not assigned on the local node", 
      Locality.NODE_LOCAL, taImpl.getLocality());
}
项目:hadoop    文件:TestTaskAttempt.java   
@Test
public void testContainerCleanedWhileRunning() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.2", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  assertEquals("Task attempt is not in running state", taImpl.getState(),
      TaskAttemptState.RUNNING);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
      eventHandler.internalError);
  assertEquals("Task attempt is not assigned on the local rack",
      Locality.RACK_LOCAL, taImpl.getLocality());
}
项目:hadoop    文件:TestTaskAttempt.java   
@Test
public void testContainerCleanedWhileCommitting() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {});

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_COMMIT_PENDING));

  assertEquals("Task attempt is not in commit pending state", taImpl.getState(),
      TaskAttemptState.COMMIT_PENDING);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
      eventHandler.internalError);
  assertEquals("Task attempt is assigned locally", Locality.OFF_SWITCH,
      taImpl.getLocality());
}
项目:hadoop    文件:TestTaskAttempt.java   
@Test
public void testDoubleTooManyFetchFailure() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_DONE));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));

  assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
      TaskAttemptState.SUCCEEDED);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
  assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
      TaskAttemptState.FAILED);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
  assertEquals("Task attempt is not in FAILED state, still", taImpl.getState(),
      TaskAttemptState.FAILED);
  assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
      eventHandler.internalError);
}
项目:hadoop    文件:TestTaskAttempt.java   
@Test
public void testAppDiognosticEventOnUnassignedTask() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
      appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(
      new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);

  TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
      jobFile, 1, splits, jobConf, taListener,
      new Token(), new Credentials(), new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId,
      "Task got killed"));
  assertFalse(
      "InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task",
      eventHandler.internalError);
}
项目:hadoop    文件:TestTaskAttempt.java   
@Test
public void testTooManyFetchFailureAfterKill() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
      splits, jobConf, taListener,
      mock(Token.class), new Credentials(),
      new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
    TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
    container, mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  taImpl.handle(new TaskAttemptEvent(attemptId,
    TaskAttemptEventType.TA_DONE));
  taImpl.handle(new TaskAttemptEvent(attemptId,
    TaskAttemptEventType.TA_CONTAINER_CLEANED));

  assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
    TaskAttemptState.SUCCEEDED);
  taImpl.handle(new TaskAttemptEvent(attemptId,
    TaskAttemptEventType.TA_KILL));
  assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
    TaskAttemptState.KILLED);
  taImpl.handle(new TaskAttemptEvent(attemptId,
    TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
  assertEquals("Task attempt is not in KILLED state, still", taImpl.getState(),
    TaskAttemptState.KILLED);
  assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
    eventHandler.internalError);
}
项目:hadoop    文件:TestTaskAttempt.java   
@Test
public void testAppDiognosticEventOnNewTask() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
      appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(
      new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);

  TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
      jobFile, 1, splits, jobConf, taListener,
      new Token(), new Credentials(), new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");
  taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId,
      "Task got killed"));
  assertFalse(
      "InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task",
      eventHandler.internalError);
}
项目:hadoop    文件:TestTaskAttempt.java   
@Test
 public void testFetchFailureAttemptFinishTime() throws Exception{
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
Path jobFile = mock(Path.class);

MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
when(taListener.getAddress()).thenReturn(
    new InetSocketAddress("localhost", 0));

JobConf jobConf = new JobConf();
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
jobConf.setBoolean("fs.file.impl.disable.cache", true);
jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});

AppContext appCtx = mock(AppContext.class);
ClusterInfo clusterInfo = mock(ClusterInfo.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);

TaskAttemptImpl taImpl =
  new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
  splits, jobConf, taListener,mock(Token.class), new Credentials(),
  new SystemClock(), appCtx);

NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0"); 

taImpl.handle(new TaskAttemptEvent(attemptId,
    TaskAttemptEventType.TA_SCHEDULE));
taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
    container, mock(Map.class)));
taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
taImpl.handle(new TaskAttemptEvent(attemptId,
    TaskAttemptEventType.TA_DONE));
taImpl.handle(new TaskAttemptEvent(attemptId,
    TaskAttemptEventType.TA_CONTAINER_CLEANED));

assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
          TaskAttemptState.SUCCEEDED);

assertTrue("Task Attempt finish time is not greater than 0", 
        taImpl.getFinishTime() > 0);

Long finishTime = taImpl.getFinishTime();
Thread.sleep(5);   
taImpl.handle(new TaskAttemptEvent(attemptId,
   TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));

assertEquals("Task attempt is not in Too Many Fetch Failure state", 
        taImpl.getState(), TaskAttemptState.FAILED);

assertEquals("After TA_TOO_MANY_FETCH_FAILURE,"
    + " Task attempt finish time is not the same ",
    finishTime, Long.valueOf(taImpl.getFinishTime()));  
 }
项目:hadoop    文件:TestTaskAttempt.java   
@Test
public void testContainerKillAfterAssigned() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
      0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(
      new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);

  TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
      jobFile, 1, splits, jobConf, taListener, new Token(),
      new Credentials(), new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.2", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
      mock(Map.class)));
  assertEquals("Task attempt is not in assinged state",
      taImpl.getInternalState(), TaskAttemptStateInternal.ASSIGNED);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_KILL));
  assertEquals("Task should be in KILLED state",
      TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
      taImpl.getInternalState());
}
项目:hadoop    文件:TestTaskAttempt.java   
@Test
public void testContainerKillWhileRunning() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
      0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(
      new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);

  TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
      jobFile, 1, splits, jobConf, taListener, new Token(),
      new Credentials(), new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.2", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
      mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  assertEquals("Task attempt is not in running state", taImpl.getState(),
      TaskAttemptState.RUNNING);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_KILL));
  assertFalse("InternalError occurred trying to handle TA_KILL",
      eventHandler.internalError);
  assertEquals("Task should be in KILLED state",
      TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
      taImpl.getInternalState());
}
项目:hadoop    文件:TestTaskAttempt.java   
@Test
public void testContainerKillWhileCommitPending() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
      0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(
      new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);

  TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
      jobFile, 1, splits, jobConf, taListener, new Token(),
      new Credentials(), new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.2", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
      mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  assertEquals("Task attempt is not in running state", taImpl.getState(),
      TaskAttemptState.RUNNING);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_COMMIT_PENDING));
  assertEquals("Task should be in COMMIT_PENDING state",
      TaskAttemptStateInternal.COMMIT_PENDING, taImpl.getInternalState());
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_KILL));
  assertFalse("InternalError occurred trying to handle TA_KILL",
      eventHandler.internalError);
  assertEquals("Task should be in KILLED state",
      TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
      taImpl.getInternalState());
}
项目:hadoop    文件:TestShuffleProvider.java   
@Test
public void testShuffleProviders() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 1);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  Path jobFile = mock(Path.class);

  EventHandler eventHandler = mock(EventHandler.class);
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");

  jobConf.set(YarnConfiguration.NM_AUX_SERVICES,
    TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID + "," +
    TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID);

  String serviceName = TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID;
  String serviceStr = String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, serviceName);
  jobConf.set(serviceStr, TestShuffleHandler1.class.getName());

  serviceName = TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID;
  serviceStr = String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, serviceName);
  jobConf.set(serviceStr, TestShuffleHandler2.class.getName());

  jobConf.set(MRJobConfig.MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES,
                TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID
                   + "," + TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID);

  Credentials credentials = new Credentials();
  Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
      ("tokenid").getBytes(), ("tokenpw").getBytes(),
      new Text("tokenkind"), new Text("tokenservice"));
  TaskAttemptImpl taImpl =
      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
          mock(TaskSplitMetaInfo.class), jobConf, taListener,
          jobToken, credentials,
          new SystemClock(), null);

  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());

  ContainerLaunchContext launchCtx =
      TaskAttemptImpl.createContainerLaunchContext(null,
          jobConf, jobToken, taImpl.createRemoteTask(),
          TypeConverter.fromYarn(jobId),
          mock(WrappedJvmID.class), taListener,
          credentials);

  Map<String, ByteBuffer> serviceDataMap = launchCtx.getServiceData();
  Assert.assertNotNull("TestShuffleHandler1 is missing", serviceDataMap.get(TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID));
  Assert.assertNotNull("TestShuffleHandler2 is missing", serviceDataMap.get(TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID));
  Assert.assertTrue("mismatch number of services in map", serviceDataMap.size() == 3); // 2 that we entered + 1 for the built-in shuffle-provider
}
项目:hadoop    文件:TestTaskAttemptContainerRequest.java   
@Test
public void testAttemptContainerRequest() throws Exception {
  final Text SECRET_KEY_ALIAS = new Text("secretkeyalias");
  final byte[] SECRET_KEY = ("secretkey").getBytes();
  Map<ApplicationAccessType, String> acls =
      new HashMap<ApplicationAccessType, String>(1);
  acls.put(ApplicationAccessType.VIEW_APP, "otheruser");
  ApplicationId appId = ApplicationId.newInstance(1, 1);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  Path jobFile = mock(Path.class);

  EventHandler eventHandler = mock(EventHandler.class);
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");

  // setup UGI for security so tokens and keys are preserved
  jobConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
  UserGroupInformation.setConfiguration(jobConf);

  Credentials credentials = new Credentials();
  credentials.addSecretKey(SECRET_KEY_ALIAS, SECRET_KEY);
  Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
      ("tokenid").getBytes(), ("tokenpw").getBytes(),
      new Text("tokenkind"), new Text("tokenservice"));

  TaskAttemptImpl taImpl =
      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
          mock(TaskSplitMetaInfo.class), jobConf, taListener,
          jobToken, credentials,
          new SystemClock(), null);

  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());

  ContainerLaunchContext launchCtx =
      TaskAttemptImpl.createContainerLaunchContext(acls,
          jobConf, jobToken, taImpl.createRemoteTask(),
          TypeConverter.fromYarn(jobId),
          mock(WrappedJvmID.class), taListener,
          credentials);

  Assert.assertEquals("ACLs mismatch", acls, launchCtx.getApplicationACLs());
  Credentials launchCredentials = new Credentials();

  DataInputByteBuffer dibb = new DataInputByteBuffer();
  dibb.reset(launchCtx.getTokens());
  launchCredentials.readTokenStorageStream(dibb);

  // verify all tokens specified for the task attempt are in the launch context
  for (Token<? extends TokenIdentifier> token : credentials.getAllTokens()) {
    Token<? extends TokenIdentifier> launchToken =
        launchCredentials.getToken(token.getService());
    Assert.assertNotNull("Token " + token.getService() + " is missing",
        launchToken);
    Assert.assertEquals("Token " + token.getService() + " mismatch",
        token, launchToken);
  }

  // verify the secret key is in the launch context
  Assert.assertNotNull("Secret key missing",
      launchCredentials.getSecretKey(SECRET_KEY_ALIAS));
  Assert.assertTrue("Secret key mismatch", Arrays.equals(SECRET_KEY,
      launchCredentials.getSecretKey(SECRET_KEY_ALIAS)));
}
项目:aliyun-oss-hadoop-fs    文件:TestTaskAttempt.java   
@Test
public void testLaunchFailedWhileKilling() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), null);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_KILL));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
  assertFalse(eventHandler.internalError);
  assertEquals("Task attempt is not assigned on the local node", 
      Locality.NODE_LOCAL, taImpl.getLocality());
}
项目:aliyun-oss-hadoop-fs    文件:TestTaskAttempt.java   
@Test
public void testContainerCleanedWhileRunning() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);
  setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.2", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  assertEquals("Task attempt is not in running state", taImpl.getState(),
      TaskAttemptState.RUNNING);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
      eventHandler.internalError);
  assertEquals("Task attempt is not assigned on the local rack",
      Locality.RACK_LOCAL, taImpl.getLocality());
}
项目:aliyun-oss-hadoop-fs    文件:TestTaskAttempt.java   
@Test
public void testContainerCleanedWhileCommitting() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {});

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);
  setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_COMMIT_PENDING));

  assertEquals("Task attempt is not in commit pending state", taImpl.getState(),
      TaskAttemptState.COMMIT_PENDING);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
      eventHandler.internalError);
  assertEquals("Task attempt is assigned locally", Locality.OFF_SWITCH,
      taImpl.getLocality());
}
项目:aliyun-oss-hadoop-fs    文件:TestTaskAttempt.java   
@Test
public void testDoubleTooManyFetchFailure() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  TaskId reduceTaskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE);
  TaskAttemptId reduceTAId =
      MRBuilderUtils.newTaskAttemptId(reduceTaskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);
  setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_DONE));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_COMPLETED));

  assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
      TaskAttemptState.SUCCEEDED);
  taImpl.handle(new TaskAttemptTooManyFetchFailureEvent(attemptId,
      reduceTAId, "Host"));
  assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
      TaskAttemptState.FAILED);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
  assertEquals("Task attempt is not in FAILED state, still", taImpl.getState(),
      TaskAttemptState.FAILED);
  assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
      eventHandler.internalError);
}
项目:aliyun-oss-hadoop-fs    文件:TestTaskAttempt.java   
@Test
public void testAppDiognosticEventOnUnassignedTask() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
      appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(
      new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);
  setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);

  TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
      jobFile, 1, splits, jobConf, taListener,
      new Token(), new Credentials(), new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId,
      "Task got killed"));
  assertFalse(
      "InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task",
      eventHandler.internalError);
  try {
    taImpl.handle(new TaskAttemptEvent(attemptId,
        TaskAttemptEventType.TA_KILL));
    Assert.assertTrue("No exception on UNASSIGNED STATE KILL event", true);
  } catch (Exception e) {
    Assert.assertFalse(
        "Exception not expected for UNASSIGNED STATE KILL event", true);
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestTaskAttempt.java   
@Test
public void testTooManyFetchFailureAfterKill() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);
  setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
      splits, jobConf, taListener,
      mock(Token.class), new Credentials(),
      new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
    TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
    container, mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  taImpl.handle(new TaskAttemptEvent(attemptId,
    TaskAttemptEventType.TA_DONE));
  taImpl.handle(new TaskAttemptEvent(attemptId,
    TaskAttemptEventType.TA_CONTAINER_COMPLETED));

  assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
    TaskAttemptState.SUCCEEDED);
  taImpl.handle(new TaskAttemptEvent(attemptId,
    TaskAttemptEventType.TA_KILL));
  assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
    TaskAttemptState.KILLED);
  taImpl.handle(new TaskAttemptEvent(attemptId,
    TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
  assertEquals("Task attempt is not in KILLED state, still", taImpl.getState(),
    TaskAttemptState.KILLED);
  assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
    eventHandler.internalError);
}