Java 类org.apache.hadoop.mapred.JvmManager.JvmManagerForType 实例源码

项目:mapreduce-fork    文件:TestJvmManager.java   
/**
 * Create a bunch of tasks and use a special hash map to detect
 * racy access to the various internal data structures of JvmManager.
 * (Regression test for MAPREDUCE-2224)
 */
@Test
public void testForRaces() throws Exception {
  JvmManagerForType mapJvmManager = jvmManager
      .getJvmManagerForType(TaskType.MAP);

  // Sub out the HashMaps for maps that will detect racy access.
  mapJvmManager.jvmToRunningTask = new RaceHashMap<JVMId, TaskRunner>();
  mapJvmManager.runningTaskToJvm = new RaceHashMap<TaskRunner, JVMId>();
  mapJvmManager.jvmIdToRunner = new RaceHashMap<JVMId, JvmRunner>();

  // Launch a bunch of JVMs, but only allow MAP_SLOTS to run at once.
  final ExecutorService exec = Executors.newFixedThreadPool(MAP_SLOTS);
  final AtomicReference<Throwable> failed =
    new AtomicReference<Throwable>();

  for (int i = 0; i < MAP_SLOTS*5; i++) {
    JobConf taskConf = new JobConf(ttConf);
    TaskAttemptID attemptID = new TaskAttemptID("test", 0, TaskType.MAP, i, 0);
    Task task = new MapTask(null, attemptID, i, null, 1);
    task.setConf(taskConf);
    TaskInProgress tip = tt.new TaskInProgress(task, taskConf);
    File pidFile = new File(TEST_DIR, "pid_" + i);
    final TaskRunner taskRunner = task.createRunner(tt, tip);
    // launch a jvm which sleeps for 60 seconds
    final Vector<String> vargs = new Vector<String>(2);
    vargs.add(writeScript("script_" + i, "echo hi\n", pidFile).getAbsolutePath());
    final File workDir = new File(TEST_DIR, "work_" + i);
    workDir.mkdir();
    final File stdout = new File(TEST_DIR, "stdout_" + i);
    final File stderr = new File(TEST_DIR, "stderr_" + i);

    // launch the process and wait in a thread, till it finishes
    Runnable launcher = new Runnable() {
      public void run() {
        try {
          taskRunner.launchJvmAndWait(null, vargs, stdout, stderr, 100,
              workDir, null);
        } catch (Throwable t) {
          failed.compareAndSet(null, t);
          exec.shutdownNow();
          return;
        }
      }
    };
    exec.submit(launcher);
  }

  exec.shutdown();
  exec.awaitTermination(3, TimeUnit.MINUTES);
  if (failed.get() != null) {
    throw new RuntimeException(failed.get());
  }
}