Java 类org.apache.hadoop.mapred.TaskTracker 实例源码

项目:hadoop-2.6.0-cdh5.4.3    文件:Localizer.java   
/**
 * Create taskDirs on all the disks. Otherwise, in some cases, like when
 * LinuxTaskController is in use, child might wish to balance load across
 * disks but cannot itself create attempt directory because of the fact that
 * job directory is writable only by the TT.
 * 
 * @param user
 * @param jobId
 * @param attemptId
 * @throws IOException
 */
public void initializeAttemptDirs(String user, String jobId,
    String attemptId)
    throws IOException {

  boolean initStatus = false;
  String attemptDirPath =
      TaskTracker.getLocalTaskDir(user, jobId, attemptId);

  for (String localDir : localDirs) {
    Path localAttemptDir = new Path(localDir, attemptDirPath);

    boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
    if (!attemptDirStatus) {
      LOG.warn("localAttemptDir " + localAttemptDir.toString()
          + " couldn't be created.");
    }
    initStatus = initStatus || attemptDirStatus;
  }

  if (!initStatus) {
    throw new IOException("Not able to initialize attempt directories "
        + "in any of the configured local directories for the attempt "
        + attemptId);
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestTrackerDistributedCacheManager.java   
public void testFileSystemOtherThanDefault() throws Exception {
  if (!canRun()) {
    return;
  }
  TrackerDistributedCacheManager manager =
    new TrackerDistributedCacheManager(conf, taskController);
  conf.set("fs.fakefile.impl", FileSystem.getFileSystemClass("file", conf).getName());
  String userName = getJobOwnerName();
  conf.set("user.name", userName);
  Path fileToCache = new Path("fakefile:///"
      + firstCacheFile.toUri().getPath());
  CacheFile file = new CacheFile(fileToCache.toUri(), 
                               CacheFile.FileType.REGULAR, 
                               false, 0, false);
  Path result = manager.getLocalCache(fileToCache.toUri(), conf,
      TaskTracker.getPrivateDistributedCacheDir(userName),
      fs.getFileStatus(firstCacheFile), false,
      System.currentTimeMillis(),
      false, file);
  assertNotNull("DistributedCache cached file on non-default filesystem.",
      result);
}
项目:hadoop-on-lustre    文件:Localizer.java   
/**
 * Create taskDirs on all the disks. Otherwise, in some cases, like when
 * LinuxTaskController is in use, child might wish to balance load across
 * disks but cannot itself create attempt directory because of the fact that
 * job directory is writable only by the TT.
 * 
 * @param user
 * @param jobId
 * @param attemptId
 * @throws IOException
 */
public void initializeAttemptDirs(String user, String jobId,
    String attemptId)
    throws IOException {

  boolean initStatus = false;
  String attemptDirPath =
      TaskTracker.getLocalTaskDir(user, jobId, attemptId);

  for (String localDir : localDirs) {
    Path localAttemptDir = new Path(localDir, attemptDirPath);

    boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
    if (!attemptDirStatus) {
      LOG.warn("localAttemptDir " + localAttemptDir.toString()
          + " couldn't be created.");
    }
    initStatus = initStatus || attemptDirStatus;
  }

  if (!initStatus) {
    throw new IOException("Not able to initialize attempt directories "
        + "in any of the configured local directories for the attempt "
        + attemptId);
  }
}
项目:hadoop-on-lustre    文件:TestTrackerDistributedCacheManager.java   
public void testFileSystemOtherThanDefault() throws Exception {
  if (!canRun()) {
    return;
  }
  TrackerDistributedCacheManager manager =
    new TrackerDistributedCacheManager(conf, taskController);
  conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
  String userName = getJobOwnerName();
  conf.set("user.name", userName);
  Path fileToCache = new Path("fakefile:///"
      + firstCacheFile.toUri().getPath());
  CacheFile file = new CacheFile(fileToCache.toUri(), 
                               CacheFile.FileType.REGULAR, 
                               false, 0, false);
  Path result = manager.getLocalCache(fileToCache.toUri(), conf,
      TaskTracker.getPrivateDistributedCacheDir(userName),
      fs.getFileStatus(firstCacheFile), false,
      System.currentTimeMillis(),
      false, file);
  assertNotNull("DistributedCache cached file on non-default filesystem.",
      result);
}
项目:hanoi-hadoop-2.0.0-cdh    文件:Localizer.java   
/**
 * Create taskDirs on all the disks. Otherwise, in some cases, like when
 * LinuxTaskController is in use, child might wish to balance load across
 * disks but cannot itself create attempt directory because of the fact that
 * job directory is writable only by the TT.
 * 
 * @param user
 * @param jobId
 * @param attemptId
 * @throws IOException
 */
public void initializeAttemptDirs(String user, String jobId,
    String attemptId)
    throws IOException {

  boolean initStatus = false;
  String attemptDirPath =
      TaskTracker.getLocalTaskDir(user, jobId, attemptId);

  for (String localDir : localDirs) {
    Path localAttemptDir = new Path(localDir, attemptDirPath);

    boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
    if (!attemptDirStatus) {
      LOG.warn("localAttemptDir " + localAttemptDir.toString()
          + " couldn't be created.");
    }
    initStatus = initStatus || attemptDirStatus;
  }

  if (!initStatus) {
    throw new IOException("Not able to initialize attempt directories "
        + "in any of the configured local directories for the attempt "
        + attemptId);
  }
}
项目:hanoi-hadoop-2.0.0-cdh    文件:TestTrackerDistributedCacheManager.java   
public void testFileSystemOtherThanDefault() throws Exception {
  if (!canRun()) {
    return;
  }
  TrackerDistributedCacheManager manager =
    new TrackerDistributedCacheManager(conf, taskController);
  conf.set("fs.fakefile.impl", FileSystem.getFileSystemClass("file", conf).getName());
  String userName = getJobOwnerName();
  conf.set("user.name", userName);
  Path fileToCache = new Path("fakefile:///"
      + firstCacheFile.toUri().getPath());
  CacheFile file = new CacheFile(fileToCache.toUri(), 
                               CacheFile.FileType.REGULAR, 
                               false, 0, false);
  Path result = manager.getLocalCache(fileToCache.toUri(), conf,
      TaskTracker.getPrivateDistributedCacheDir(userName),
      fs.getFileStatus(firstCacheFile), false,
      System.currentTimeMillis(),
      false, file);
  assertNotNull("DistributedCache cached file on non-default filesystem.",
      result);
}
项目:mapreduce-fork    文件:Localizer.java   
/**
 * Create taskDirs on all the disks. Otherwise, in some cases, like when
 * LinuxTaskController is in use, child might wish to balance load across
 * disks but cannot itself create attempt directory because of the fact that
 * job directory is writable only by the TT.
 * 
 * @param user
 * @param jobId
 * @param attemptId
 * @param isCleanupAttempt
 * @throws IOException
 */
public void initializeAttemptDirs(String user, String jobId,
    String attemptId, boolean isCleanupAttempt)
    throws IOException {

  boolean initStatus = false;
  String attemptDirPath =
      TaskTracker.getLocalTaskDir(user, jobId, attemptId, isCleanupAttempt);

  for (String localDir : localDirs) {
    Path localAttemptDir = new Path(localDir, attemptDirPath);

    boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
    if (!attemptDirStatus) {
      LOG.warn("localAttemptDir " + localAttemptDir.toString()
          + " couldn't be created.");
    }
    initStatus = initStatus || attemptDirStatus;
  }

  if (!initStatus) {
    throw new IOException("Not able to initialize attempt directories "
        + "in any of the configured local directories for the attempt "
        + attemptId);
  }
}
项目:mapreduce-fork    文件:TestTrackerDistributedCacheManager.java   
public void testFileSystemOtherThanDefault() throws Exception {
  if (!canRun()) {
    return;
  }
  TrackerDistributedCacheManager manager =
    new TrackerDistributedCacheManager(conf, taskController);
  conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
  String userName = getJobOwnerName();
  conf.set(MRJobConfig.USER_NAME, userName);
  Path fileToCache = new Path("fakefile:///"
      + firstCacheFile.toUri().getPath());
  Path result = manager.getLocalCache(fileToCache.toUri(), conf,
      TaskTracker.getPrivateDistributedCacheDir(userName),
      fs.getFileStatus(firstCacheFile), false,
      getFileStamp(firstCacheFile),
      new Path(TEST_ROOT_DIR), false, false);
  assertNotNull("DistributedCache cached file on non-default filesystem.",
      result);
}
项目:hortonworks-extension    文件:Localizer.java   
/**
 * Create taskDirs on all the disks. Otherwise, in some cases, like when
 * LinuxTaskController is in use, child might wish to balance load across
 * disks but cannot itself create attempt directory because of the fact that
 * job directory is writable only by the TT.
 * 
 * @param user
 * @param jobId
 * @param attemptId
 * @throws IOException
 */
public void initializeAttemptDirs(String user, String jobId,
    String attemptId)
    throws IOException {

  boolean initStatus = false;
  String attemptDirPath =
      TaskTracker.getLocalTaskDir(user, jobId, attemptId);

  for (String localDir : localDirs) {
    Path localAttemptDir = new Path(localDir, attemptDirPath);

    boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
    if (!attemptDirStatus) {
      LOG.warn("localAttemptDir " + localAttemptDir.toString()
          + " couldn't be created.");
    }
    initStatus = initStatus || attemptDirStatus;
  }

  if (!initStatus) {
    throw new IOException("Not able to initialize attempt directories "
        + "in any of the configured local directories for the attempt "
        + attemptId);
  }
}
项目:hortonworks-extension    文件:TestTrackerDistributedCacheManager.java   
public void testFileSystemOtherThanDefault() throws Exception {
  if (!canRun()) {
    return;
  }
  TrackerDistributedCacheManager manager =
    new TrackerDistributedCacheManager(conf, taskController);
  conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
  String userName = getJobOwnerName();
  conf.set("user.name", userName);
  Path fileToCache = new Path("fakefile:///"
      + firstCacheFile.toUri().getPath());
  CacheFile file = new CacheFile(fileToCache.toUri(), 
                               CacheFile.FileType.REGULAR, 
                               false, 0, false);
  Path result = manager.getLocalCache(fileToCache.toUri(), conf,
      TaskTracker.getPrivateDistributedCacheDir(userName),
      fs.getFileStatus(firstCacheFile), false,
      System.currentTimeMillis(),
      false, file);
  assertNotNull("DistributedCache cached file on non-default filesystem.",
      result);
}
项目:hortonworks-extension    文件:Localizer.java   
/**
 * Create taskDirs on all the disks. Otherwise, in some cases, like when
 * LinuxTaskController is in use, child might wish to balance load across
 * disks but cannot itself create attempt directory because of the fact that
 * job directory is writable only by the TT.
 * 
 * @param user
 * @param jobId
 * @param attemptId
 * @throws IOException
 */
public void initializeAttemptDirs(String user, String jobId,
    String attemptId)
    throws IOException {

  boolean initStatus = false;
  String attemptDirPath =
      TaskTracker.getLocalTaskDir(user, jobId, attemptId);

  for (String localDir : localDirs) {
    Path localAttemptDir = new Path(localDir, attemptDirPath);

    boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
    if (!attemptDirStatus) {
      LOG.warn("localAttemptDir " + localAttemptDir.toString()
          + " couldn't be created.");
    }
    initStatus = initStatus || attemptDirStatus;
  }

  if (!initStatus) {
    throw new IOException("Not able to initialize attempt directories "
        + "in any of the configured local directories for the attempt "
        + attemptId);
  }
}
项目:hortonworks-extension    文件:TestTrackerDistributedCacheManager.java   
public void testFileSystemOtherThanDefault() throws Exception {
  if (!canRun()) {
    return;
  }
  TrackerDistributedCacheManager manager =
    new TrackerDistributedCacheManager(conf, taskController);
  conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
  String userName = getJobOwnerName();
  conf.set("user.name", userName);
  Path fileToCache = new Path("fakefile:///"
      + firstCacheFile.toUri().getPath());
  CacheFile file = new CacheFile(fileToCache.toUri(), 
                               CacheFile.FileType.REGULAR, 
                               false, 0, false);
  Path result = manager.getLocalCache(fileToCache.toUri(), conf,
      TaskTracker.getPrivateDistributedCacheDir(userName),
      fs.getFileStatus(firstCacheFile), false,
      System.currentTimeMillis(),
      false, file);
  assertNotNull("DistributedCache cached file on non-default filesystem.",
      result);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TaskMemoryManagerThread.java   
public TaskMemoryManagerThread(TaskTracker taskTracker) {

  this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
    taskTracker.getJobConf().getLong(
      "mapred.tasktracker.taskmemorymanager.monitoring-interval", 
      5000L));

  this.taskTracker = taskTracker;
  long reservedRssMemory = taskTracker.getReservedPhysicalMemoryOnTT();
  long totalPhysicalMemoryOnTT = taskTracker.getTotalPhysicalMemoryOnTT();
  if (reservedRssMemory == JobConf.DISABLED_MEMORY_LIMIT ||
      totalPhysicalMemoryOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
    maxRssMemoryAllowedForAllTasks = JobConf.DISABLED_MEMORY_LIMIT;
    LOG.info("Physical memory monitoring disabled");
  } else {
    maxRssMemoryAllowedForAllTasks =
              totalPhysicalMemoryOnTT - reservedRssMemory;
    if (maxRssMemoryAllowedForAllTasks < 0) {
      maxRssMemoryAllowedForAllTasks = JobConf.DISABLED_MEMORY_LIMIT;
      LOG.warn("Reserved physical memory exceeds total. Physical memory " +
          "monitoring disabled.");
    } else {
      LOG.info(String.format("Physical memory monitoring enabled. " +
          "System total: %s. Reserved: %s. Maximum: %s.",
          totalPhysicalMemoryOnTT, reservedRssMemory,
          maxRssMemoryAllowedForAllTasks));
    }
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:Localizer.java   
/**
 * Prepare the job directories for a given job. To be called by the job
 * localization code, only if the job is not already localized.
 * 
 * <br>
 * Here, we set 700 permissions on the job directories created on all disks.
 * This we do so as to avoid any misuse by other users till the time
 * {@link TaskController#initializeJob} is run at a
 * later time to set proper private permissions on the job directories. <br>
 * 
 * @param user
 * @param jobId
 * @throws IOException
 */
public void initializeJobDirs(String user, JobID jobId)
    throws IOException {
  boolean initJobDirStatus = false;
  String jobDirPath = TaskTracker.getLocalJobDir(user, jobId.toString());
  for (String localDir : localDirs) {
    Path jobDir = new Path(localDir, jobDirPath);
    if (fs.exists(jobDir)) {
      // this will happen on a partial execution of localizeJob. Sometimes
      // copying job.xml to the local disk succeeds but copying job.jar might
      // throw out an exception. We should clean up and then try again.
      fs.delete(jobDir, true);
    }

    boolean jobDirStatus = fs.mkdirs(jobDir);
    if (!jobDirStatus) {
      LOG.warn("Not able to create job directory " + jobDir.toString());
    }

    initJobDirStatus = initJobDirStatus || jobDirStatus;

    // job-dir has to be private to the TT
    fs.setPermission(jobDir, new FsPermission((short)0700));
  }

  if (!initJobDirStatus) {
    throw new IOException("Not able to initialize job directories "
        + "in any of the configured local directories for job "
        + jobId.toString());
  }
}
项目:hadoop-on-lustre    文件:TaskMemoryManagerThread.java   
public TaskMemoryManagerThread(TaskTracker taskTracker) {

  this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
    taskTracker.getJobConf().getLong(
      "mapred.tasktracker.taskmemorymanager.monitoring-interval", 
      5000L));

  this.taskTracker = taskTracker;
}
项目:hadoop-0.20    文件:TaskMemoryManagerThread.java   
public TaskMemoryManagerThread(TaskTracker taskTracker) {

  this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
    taskTracker.getJobConf().getLong(
      "mapred.tasktracker.taskmemorymanager.monitoring-interval", 
      5000L),
    taskTracker.getJobConf().getLong(
      "mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill",
      ProcfsBasedProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL));

  this.taskTracker = taskTracker;
}
项目:hanoi-hadoop-2.0.0-cdh    文件:TaskMemoryManagerThread.java   
public TaskMemoryManagerThread(TaskTracker taskTracker) {

  this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
    taskTracker.getJobConf().getLong(
      "mapred.tasktracker.taskmemorymanager.monitoring-interval", 
      5000L));

  this.taskTracker = taskTracker;
  long reservedRssMemory = taskTracker.getReservedPhysicalMemoryOnTT();
  long totalPhysicalMemoryOnTT = taskTracker.getTotalPhysicalMemoryOnTT();
  if (reservedRssMemory == JobConf.DISABLED_MEMORY_LIMIT ||
      totalPhysicalMemoryOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
    maxRssMemoryAllowedForAllTasks = JobConf.DISABLED_MEMORY_LIMIT;
    LOG.info("Physical memory monitoring disabled");
  } else {
    maxRssMemoryAllowedForAllTasks =
              totalPhysicalMemoryOnTT - reservedRssMemory;
    if (maxRssMemoryAllowedForAllTasks < 0) {
      maxRssMemoryAllowedForAllTasks = JobConf.DISABLED_MEMORY_LIMIT;
      LOG.warn("Reserved physical memory exceeds total. Physical memory " +
          "monitoring disabled.");
    } else {
      LOG.info(String.format("Physical memory monitoring enabled. " +
          "System total: %s. Reserved: %s. Maximum: %s.",
          totalPhysicalMemoryOnTT, reservedRssMemory,
          maxRssMemoryAllowedForAllTasks));
    }
  }
}
项目:hanoi-hadoop-2.0.0-cdh    文件:Localizer.java   
/**
 * Prepare the job directories for a given job. To be called by the job
 * localization code, only if the job is not already localized.
 * 
 * <br>
 * Here, we set 700 permissions on the job directories created on all disks.
 * This we do so as to avoid any misuse by other users till the time
 * {@link TaskController#initializeJob} is run at a
 * later time to set proper private permissions on the job directories. <br>
 * 
 * @param user
 * @param jobId
 * @throws IOException
 */
public void initializeJobDirs(String user, JobID jobId)
    throws IOException {
  boolean initJobDirStatus = false;
  String jobDirPath = TaskTracker.getLocalJobDir(user, jobId.toString());
  for (String localDir : localDirs) {
    Path jobDir = new Path(localDir, jobDirPath);
    if (fs.exists(jobDir)) {
      // this will happen on a partial execution of localizeJob. Sometimes
      // copying job.xml to the local disk succeeds but copying job.jar might
      // throw out an exception. We should clean up and then try again.
      fs.delete(jobDir, true);
    }

    boolean jobDirStatus = fs.mkdirs(jobDir);
    if (!jobDirStatus) {
      LOG.warn("Not able to create job directory " + jobDir.toString());
    }

    initJobDirStatus = initJobDirStatus || jobDirStatus;

    // job-dir has to be private to the TT
    fs.setPermission(jobDir, new FsPermission((short)0700));
  }

  if (!initJobDirStatus) {
    throw new IOException("Not able to initialize job directories "
        + "in any of the configured local directories for job "
        + jobId.toString());
  }
}
项目:mapreduce-fork    文件:TaskMemoryManagerThread.java   
public TaskMemoryManagerThread(TaskTracker taskTracker) {
  this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
    taskTracker.getJobConf().getLong(
      TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL, 5000L));         
  this.taskTracker = taskTracker;
  long reservedRssMemory = taskTracker.getReservedPhysicalMemoryOnTT();
  long totalPhysicalMemoryOnTT = taskTracker.getTotalPhysicalMemoryOnTT();
  if (reservedRssMemory == JobConf.DISABLED_MEMORY_LIMIT ||
      totalPhysicalMemoryOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
    maxRssMemoryAllowedForAllTasks = JobConf.DISABLED_MEMORY_LIMIT;
  } else {
    maxRssMemoryAllowedForAllTasks =
              totalPhysicalMemoryOnTT - reservedRssMemory;
  }
}
项目:mapreduce-fork    文件:Localizer.java   
/**
 * Prepare the job directories for a given job. To be called by the job
 * localization code, only if the job is not already localized.
 * 
 * <br>
 * Here, we set 700 permissions on the job directories created on all disks.
 * This we do so as to avoid any misuse by other users till the time
 * {@link TaskController#initializeJob(JobInitializationContext)} is run at a
 * later time to set proper private permissions on the job directories. <br>
 * 
 * @param user
 * @param jobId
 * @throws IOException
 */
public void initializeJobDirs(String user, JobID jobId)
    throws IOException {
  boolean initJobDirStatus = false;
  String jobDirPath = TaskTracker.getLocalJobDir(user, jobId.toString());
  for (String localDir : localDirs) {
    Path jobDir = new Path(localDir, jobDirPath);
    if (fs.exists(jobDir)) {
      // this will happen on a partial execution of localizeJob. Sometimes
      // copying job.xml to the local disk succeeds but copying job.jar might
      // throw out an exception. We should clean up and then try again.
      fs.delete(jobDir, true);
    }

    boolean jobDirStatus = fs.mkdirs(jobDir);
    if (!jobDirStatus) {
      LOG.warn("Not able to create job directory " + jobDir.toString());
    }

    initJobDirStatus = initJobDirStatus || jobDirStatus;

    // job-dir has to be private to the TT
    fs.setPermission(jobDir, new FsPermission((short)0700));
  }

  if (!initJobDirStatus) {
    throw new IOException("Not able to initialize job directories "
        + "in any of the configured local directories for job "
        + jobId.toString());
  }
}
项目:mapreduce-fork    文件:MapOutput.java   
MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, long size, 
          JobConf conf, LocalDirAllocator localDirAllocator,
          int fetcher, boolean primaryMapOutput)  throws IOException {
  this.id = ID.incrementAndGet();
  this.mapId = mapId;
  this.merger = merger;

  type = Type.DISK;

  memory = null;
  byteStream = null;

  this.size = size;

  this.localFS = FileSystem.getLocal(conf);
  String filename = "map_" + mapId.getTaskID().getId() + ".out";
  String tmpOutput = Path.SEPARATOR +
                          TaskTracker.getJobCacheSubdir(conf.getUser()) +
                     Path.SEPARATOR + mapId.getJobID() +
                     Path.SEPARATOR + merger.getReduceId() +
                     Path.SEPARATOR + "output" + 
                     Path.SEPARATOR + filename + 
                     "." + fetcher; 

  tmpOutputPath = 
    localDirAllocator.getLocalPathForWrite(tmpOutput, size, conf);
  outputPath = new Path(tmpOutputPath.getParent(), filename);
  disk = localFS.create(tmpOutputPath);

  this.primaryMapOutput = primaryMapOutput;
}
项目:mapreduce-fork    文件:TestTrackerDistributedCacheManager.java   
/**
 * Localize a file. After localization is complete, create a file, "myFile",
 * under the directory where the file is localized and ensure that it has
 * permissions different from what is set by default. Then, localize another
 * file. Verify that "myFile" has the right permissions.
 * @throws Exception
 */
public void testCustomPermissions() throws Exception {
  if (!canRun()) {
    return;
  }
  String userName = getJobOwnerName();
  conf.set(MRJobConfig.USER_NAME, userName);
  TrackerDistributedCacheManager manager = 
      new TrackerDistributedCacheManager(conf, taskController);
  FileSystem localfs = FileSystem.getLocal(conf);

  Path[] localCache = new Path[2];
  localCache[0] = manager.getLocalCache(firstCacheFile.toUri(), conf, 
      TaskTracker.getPrivateDistributedCacheDir(userName),
      fs.getFileStatus(firstCacheFile), false,
      getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false);
  FsPermission myPermission = new FsPermission((short)0600);
  Path myFile = new Path(localCache[0].getParent(), "myfile.txt");
  if (FileSystem.create(localfs, myFile, myPermission) == null) {
    throw new IOException("Could not create " + myFile);
  }
  try {
    localCache[1] = manager.getLocalCache(secondCacheFile.toUri(), conf, 
        TaskTracker.getPrivateDistributedCacheDir(userName),
        fs.getFileStatus(secondCacheFile), false, 
        getFileStamp(secondCacheFile), new Path(TEST_ROOT_DIR), false,
        false);
    FileStatus stat = localfs.getFileStatus(myFile);
    assertTrue(stat.getPermission().equals(myPermission));
    // validate permissions of localized files.
    checkFilePermissions(localCache);
  } finally {
    localfs.delete(myFile, false);
  }
}
项目:mammoth    文件:TaskMemoryManagerThread.java   
public TaskMemoryManagerThread(TaskTracker taskTracker) {

  this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
    taskTracker.getJobConf().getLong(
      "mapred.tasktracker.taskmemorymanager.monitoring-interval", 
      5000L));

  this.taskTracker = taskTracker;
}
项目:hortonworks-extension    文件:TaskMemoryManagerThread.java   
public TaskMemoryManagerThread(TaskTracker taskTracker) {

  this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
    taskTracker.getJobConf().getLong(
      "mapred.tasktracker.taskmemorymanager.monitoring-interval", 
      5000L));

  this.taskTracker = taskTracker;
}
项目:hortonworks-extension    文件:TaskMemoryManagerThread.java   
public TaskMemoryManagerThread(TaskTracker taskTracker) {

  this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
    taskTracker.getJobConf().getLong(
      "mapred.tasktracker.taskmemorymanager.monitoring-interval", 
      5000L));

  this.taskTracker = taskTracker;
}
项目:hadoop-gpu    文件:TaskMemoryManagerThread.java   
public TaskMemoryManagerThread(TaskTracker taskTracker) {

  this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
    taskTracker.getJobConf().getLong(
      "mapred.tasktracker.taskmemorymanager.monitoring-interval", 
      5000L),
    taskTracker.getJobConf().getLong(
      "mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill",
      ProcfsBasedProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL));

  this.taskTracker = taskTracker;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestTrackerDistributedCacheManager.java   
/**
   * This is the typical flow for using the DistributedCache classes.
   * 
   * @throws IOException
   * @throws LoginException
   */
  public void testManagerFlow() throws IOException, LoginException {
    if (!canRun()) {
      return;
    }

    // ****** Imitate JobClient code
    // Configures a task/job with both a regular file and a "classpath" file.
    Configuration subConf = new Configuration(conf);
    String userName = getJobOwnerName();
    subConf.set("user.name", userName);
    JobID jobid = new JobID("jt",1);
    DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
    DistributedCache.addFileToClassPath(secondCacheFile, subConf, 
                                        FileSystem.get(subConf));

    Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
    TrackerDistributedCacheManager.determineTimestamps(subConf, statCache);
    TrackerDistributedCacheManager.determineCacheVisibilities(subConf, statCache);
    assertEquals(2, statCache.size());
    // ****** End of imitating JobClient code

    Path jobFile = new Path(TEST_ROOT_DIR, "job.xml");
    FileOutputStream os = new FileOutputStream(new File(jobFile.toString()));
    subConf.writeXml(os);
    os.close();

    // ****** Imitate TaskRunner code.
    TrackerDistributedCacheManager manager = 
      new TrackerDistributedCacheManager(conf, taskController);
    TaskDistributedCacheManager handle =
      manager.newTaskDistributedCacheManager(jobid, subConf);
    assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
    File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
    handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(), 
        TaskTracker.getPrivateDistributedCacheDir(userName));
    JobLocalizer.downloadPrivateCache(subConf);
    // DOESN'T ACTUALLY HAPPEN IN THE TaskRunner (THIS IS A TODO)
//    handle.setupPrivateCache(localDirAllocator, TaskTracker
//        .getPrivateDistributedCacheDir(userName));
//    // ****** End of imitating TaskRunner code

    Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
    assertNotNull(null, localCacheFiles);
    assertEquals(2, localCacheFiles.length);
    Path cachedFirstFile = localCacheFiles[0];
    Path cachedSecondFile = localCacheFiles[1];
    assertFileLengthEquals(firstCacheFile, cachedFirstFile);
    assertFalse("Paths should be different.", 
        firstCacheFile.equals(cachedFirstFile));

    assertEquals(1, handle.getClassPaths().size());
    assertEquals(cachedSecondFile.toString(), handle.getClassPaths().get(0));

    checkFilePermissions(localCacheFiles);

    // Cleanup
    handle.release();
    manager.purgeCache();
    assertFalse(pathToFile(cachedFirstFile).exists());
  }
项目:hadoop-2.6.0-cdh5.4.3    文件:TestTrackerDistributedCacheManager.java   
public void testSameNameFileArchiveCache() throws IOException,
    InterruptedException {
  if (!canRun()) {
    return;
  }
  TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager(
      conf, taskController);
  String userName = getJobOwnerName();
  File workDir = new File(TEST_ROOT_DIR, "workdir");
  Path cacheFile = new Path(TEST_ROOT_DIR, "fileArchiveCacheFile");

  createPublicTempFile(cacheFile);
  Configuration conf1 = new Configuration(conf);
  conf1.set("user.name", userName);

  DistributedCache.addCacheFile(cacheFile.toUri(), conf1);
  DistributedCache.addCacheArchive(cacheFile.toUri(), conf1);
  TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf1);
  dumpState(conf1);

  TaskDistributedCacheManager handle = manager
      .newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
  handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(),
      TaskTracker.getPrivateDistributedCacheDir(userName));

  TaskDistributedCacheManager.CacheFile cFile = handle.getCacheFiles().get(0);
  TaskDistributedCacheManager.CacheFile cArchive = handle.getCacheFiles()
      .get(1);

  String distCacheDir = TaskTracker.getPublicDistributedCacheDir();

  Path localizedPathForFile = manager.getLocalCache(cacheFile.toUri(), conf1,
      distCacheDir, fs.getFileStatus(cacheFile), false, cFile.timestamp,
      true, cFile);

  Path localizedPathForArchive = manager.getLocalCache(cacheFile.toUri(),
      conf1, distCacheDir, fs.getFileStatus(cacheFile), true,
      cArchive.timestamp, true, cArchive);
  assertNotSame("File and Archive resolve to the same path: "
      + localizedPathForFile + ". Should differ.", localizedPathForFile,
      localizedPathForArchive);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestTrackerDistributedCacheManager.java   
private void checkLocalizedPath(boolean visibility) 
throws IOException, LoginException, InterruptedException, URISyntaxException {
  TrackerDistributedCacheManager manager = 
    new TrackerDistributedCacheManager(conf, taskController);
  String userName = getJobOwnerName();
  File workDir = new File(TEST_ROOT_DIR, "workdir");
  Path cacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
  if (visibility) {
    createPublicTempFile(cacheFile);
  } else {
    createPrivateTempFile(cacheFile);
  }

  Configuration conf1 = new Configuration(conf);
  conf1.set("user.name", userName);
  DistributedCache.addCacheFile(cacheFile.toUri(), conf1);
  TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf1);
  dumpState(conf1);

  // Task localizing for job
  TaskDistributedCacheManager handle = manager
      .newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
  handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(),
      TaskTracker.getPrivateDistributedCacheDir(userName));
  JobLocalizer.downloadPrivateCache(conf1);
  TaskDistributedCacheManager.CacheFile c = handle.getCacheFiles().get(0);
  String distCacheDir;
  if (visibility) {
    distCacheDir = TaskTracker.getPublicDistributedCacheDir(); 
  } else {
    distCacheDir = TaskTracker.getPrivateDistributedCacheDir(userName);
  }
  Path localizedPath =
    manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
                          fs.getFileStatus(cacheFile), false,
                        c.timestamp, visibility, c);
  assertTrue("Cache file didn't get localized in the expected directory. " +
      "Expected localization to happen within " + 
      ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
      ", but was localized at " + 
      localizedPath, localizedPath.toString().contains(distCacheDir));
  if (visibility) {
    checkPublicFilePermissions(new Path[]{localizedPath});
  } else {
    checkFilePermissions(new Path[]{localizedPath});
  }
}
项目:hadoop-EAR    文件:MultiTaskTracker.java   
public MultiTaskTrackerMetrics(List<TaskTracker> trackerList) {
  this.trackerList = trackerList;
  MetricsContext context = MetricsUtil.getContext("mapred");
  metricsRecord = MetricsUtil.createRecord(context, "multitasktracker");
  context.registerUpdater(this);
}
项目:hadoop-EAR    文件:MultiTaskTracker.java   
@Override
public void doUpdates(MetricsContext context) {
  LOG.info("Updating metrics");
  int numTrackers = trackerList.size();
  long totalMapRefill = 0;
  long totalReduceRefill = 0;
  int totalRunningMaps = 0;
  int totalRunningReduces = 0;
  int totalMapSlots = 0;
  int totalReduceSlots = 0;
  for (TaskTracker tracker : trackerList) {
    totalMapRefill += tracker.getAveMapSlotRefillMsecs();
    totalReduceRefill += tracker.getAveReduceSlotRefillMsecs();
    totalRunningMaps += tracker.getRunningMaps();
    totalRunningReduces += tracker.getRunningReduces();
    totalMapSlots += tracker.getMaxActualMapTasks();
    totalReduceSlots += tracker.getMaxActualReduceTasks();

    // If the metrics exists, aggregate the task launch msecs for all
    // trackers
    TaskTrackerInstrumentation instrumentation =
        tracker.getTaskTrackerInstrumentation();
    if (instrumentation != null) {
      MetricsTimeVaryingRate taskLaunchMsecs =
          instrumentation.getTaskLaunchMsecs();
      if (taskLaunchMsecs != null) {
        taskLaunchMsecs.pushMetric(null);
        aggTaskLaunchMsecs.inc(
            taskLaunchMsecs.getPreviousIntervalAverageTime());
      }
    }
  }
  long avgMapRefill = totalMapRefill / numTrackers;
  long avgReduceRefill = totalReduceRefill / numTrackers;
  metricsRecord.setMetric("aveMapSlotRefillMsecs", avgMapRefill);
  metricsRecord.setMetric("aveReduceSlotRefillMsecs", avgReduceRefill);
  metricsRecord.setMetric("maps_running", totalRunningMaps);
  metricsRecord.setMetric("reduces_running", totalRunningReduces);
  metricsRecord.setMetric("mapTaskSlots", totalMapSlots);
  metricsRecord.setMetric("reduceTaskSlots", totalReduceSlots);

  for (MetricsBase metricsBase : registry.getMetricsList()) {
    metricsBase.pushMetric(metricsRecord);
  }

  metricsRecord.update();
}
项目:hadoop-EAR    文件:MultiTaskTracker.java   
public TaskTrackerRunner(int id, TaskTracker tt) {
  super();
  this.ttToRun = tt;
  this.id = id;
}
项目:hadoop-on-lustre    文件:TestTrackerDistributedCacheManager.java   
/**
   * This is the typical flow for using the DistributedCache classes.
   * 
   * @throws IOException
   * @throws LoginException
   */
  public void testManagerFlow() throws IOException, LoginException {
    if (!canRun()) {
      return;
    }

    // ****** Imitate JobClient code
    // Configures a task/job with both a regular file and a "classpath" file.
    Configuration subConf = new Configuration(conf);
    String userName = getJobOwnerName();
    subConf.set("user.name", userName);
    JobID jobid = new JobID("jt",1);
    DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
    DistributedCache.addFileToClassPath(secondCacheFile, subConf, 
                                        FileSystem.get(subConf));
    TrackerDistributedCacheManager.determineTimestamps(subConf);
    TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
    // ****** End of imitating JobClient code

    Path jobFile = new Path(TEST_ROOT_DIR, "job.xml");
    FileOutputStream os = new FileOutputStream(new File(jobFile.toString()));
    subConf.writeXml(os);
    os.close();

    // ****** Imitate TaskRunner code.
    TrackerDistributedCacheManager manager = 
      new TrackerDistributedCacheManager(conf, taskController);
    TaskDistributedCacheManager handle =
      manager.newTaskDistributedCacheManager(jobid, subConf);
    assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
    File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
    handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(), 
        TaskTracker.getPrivateDistributedCacheDir(userName));
    JobLocalizer.downloadPrivateCache(subConf);
    // DOESN'T ACTUALLY HAPPEN IN THE TaskRunner (THIS IS A TODO)
//    handle.setupPrivateCache(localDirAllocator, TaskTracker
//        .getPrivateDistributedCacheDir(userName));
//    // ****** End of imitating TaskRunner code

    Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
    assertNotNull(null, localCacheFiles);
    assertEquals(2, localCacheFiles.length);
    Path cachedFirstFile = localCacheFiles[0];
    Path cachedSecondFile = localCacheFiles[1];
    assertFileLengthEquals(firstCacheFile, cachedFirstFile);
    assertFalse("Paths should be different.", 
        firstCacheFile.equals(cachedFirstFile));

    assertEquals(1, handle.getClassPaths().size());
    assertEquals(cachedSecondFile.toString(), handle.getClassPaths().get(0));

    checkFilePermissions(localCacheFiles);

    // Cleanup
    handle.release();
    manager.purgeCache();
    assertFalse(pathToFile(cachedFirstFile).exists());
  }
项目:hadoop-on-lustre    文件:TestTrackerDistributedCacheManager.java   
public void testSameNameFileArchiveCache() throws IOException,
    InterruptedException {
  if (!canRun()) {
    return;
  }
  TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager(
      conf, taskController);
  String userName = getJobOwnerName();
  File workDir = new File(TEST_ROOT_DIR, "workdir");
  Path cacheFile = new Path(TEST_ROOT_DIR, "fileArchiveCacheFile");

  createPublicTempFile(cacheFile);
  Configuration conf1 = new Configuration(conf);
  conf1.set("user.name", userName);

  DistributedCache.addCacheFile(cacheFile.toUri(), conf1);
  DistributedCache.addCacheArchive(cacheFile.toUri(), conf1);
  TrackerDistributedCacheManager.determineTimestamps(conf1);
  TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
  dumpState(conf1);

  TaskDistributedCacheManager handle = manager
      .newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
  handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(),
      TaskTracker.getPrivateDistributedCacheDir(userName));

  TaskDistributedCacheManager.CacheFile cFile = handle.getCacheFiles().get(0);
  TaskDistributedCacheManager.CacheFile cArchive = handle.getCacheFiles()
      .get(1);

  String distCacheDir = TaskTracker.getPublicDistributedCacheDir();

  Path localizedPathForFile = manager.getLocalCache(cacheFile.toUri(), conf1,
      distCacheDir, fs.getFileStatus(cacheFile), false, cFile.timestamp,
      true, cFile);

  Path localizedPathForArchive = manager.getLocalCache(cacheFile.toUri(),
      conf1, distCacheDir, fs.getFileStatus(cacheFile), true,
      cArchive.timestamp, true, cArchive);
  assertNotSame("File and Archive resolve to the same path: "
      + localizedPathForFile + ". Should differ.", localizedPathForFile,
      localizedPathForArchive);
}
项目:hadoop-on-lustre    文件:TestTrackerDistributedCacheManager.java   
private void checkLocalizedPath(boolean visibility) 
throws IOException, LoginException, InterruptedException {
  TrackerDistributedCacheManager manager = 
    new TrackerDistributedCacheManager(conf, taskController);
  String userName = getJobOwnerName();
  File workDir = new File(TEST_ROOT_DIR, "workdir");
  Path cacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
  if (visibility) {
    createPublicTempFile(cacheFile);
  } else {
    createPrivateTempFile(cacheFile);
  }

  Configuration conf1 = new Configuration(conf);
  conf1.set("user.name", userName);
  DistributedCache.addCacheFile(cacheFile.toUri(), conf1);
  TrackerDistributedCacheManager.determineTimestamps(conf1);
  TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
  dumpState(conf1);

  // Task localizing for job
  TaskDistributedCacheManager handle = manager
      .newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
  handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(),
      TaskTracker.getPrivateDistributedCacheDir(userName));
  JobLocalizer.downloadPrivateCache(conf1);
  TaskDistributedCacheManager.CacheFile c = handle.getCacheFiles().get(0);
  String distCacheDir;
  if (visibility) {
    distCacheDir = TaskTracker.getPublicDistributedCacheDir(); 
  } else {
    distCacheDir = TaskTracker.getPrivateDistributedCacheDir(userName);
  }
  Path localizedPath =
    manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
                          fs.getFileStatus(cacheFile), false,
                        c.timestamp, visibility, c);
  assertTrue("Cache file didn't get localized in the expected directory. " +
      "Expected localization to happen within " + 
      ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
      ", but was localized at " + 
      localizedPath, localizedPath.toString().contains(distCacheDir));
  if (visibility) {
    checkPublicFilePermissions(new Path[]{localizedPath});
  } else {
    checkFilePermissions(new Path[]{localizedPath});
  }
}
项目:RDFS    文件:MultiTaskTracker.java   
public MultiTaskTrackerMetrics(List<TaskTracker> trackerList) {
  this.trackerList = trackerList;
  MetricsContext context = MetricsUtil.getContext("mapred");
  metricsRecord = MetricsUtil.createRecord(context, "multitasktracker");
  context.registerUpdater(this);
}
项目:RDFS    文件:MultiTaskTracker.java   
@Override
public void doUpdates(MetricsContext context) {
  LOG.info("Updating metrics");
  int numTrackers = trackerList.size();
  long totalMapRefill = 0;
  long totalReduceRefill = 0;
  int totalRunningMaps = 0;
  int totalRunningReduces = 0;
  int totalMapSlots = 0;
  int totalReduceSlots = 0;
  for (TaskTracker tracker : trackerList) {
    totalMapRefill += tracker.getAveMapSlotRefillMsecs();
    totalReduceRefill += tracker.getAveReduceSlotRefillMsecs();
    totalRunningMaps += tracker.getRunningMaps();
    totalRunningReduces += tracker.getRunningReduces();
    totalMapSlots += tracker.getMaxActualMapTasks();
    totalReduceSlots += tracker.getMaxActualReduceTasks();

    // If the metrics exists, aggregate the task launch msecs for all
    // trackers
    TaskTrackerInstrumentation instrumentation =
        tracker.getTaskTrackerInstrumentation();
    if (instrumentation != null) {
      MetricsTimeVaryingRate taskLaunchMsecs =
          instrumentation.getTaskLaunchMsecs();
      if (taskLaunchMsecs != null) {
        taskLaunchMsecs.pushMetric(null);
        aggTaskLaunchMsecs.inc(
            taskLaunchMsecs.getPreviousIntervalAverageTime());
      }
    }
  }
  long avgMapRefill = totalMapRefill / numTrackers;
  long avgReduceRefill = totalReduceRefill / numTrackers;
  metricsRecord.setMetric("aveMapSlotRefillMsecs", avgMapRefill);
  metricsRecord.setMetric("aveReduceSlotRefillMsecs", avgReduceRefill);
  metricsRecord.setMetric("maps_running", totalRunningMaps);
  metricsRecord.setMetric("reduces_running", totalRunningReduces);
  metricsRecord.setMetric("mapTaskSlots", totalMapSlots);
  metricsRecord.setMetric("reduceTaskSlots", totalReduceSlots);

  for (MetricsBase metricsBase : registry.getMetricsList()) {
    metricsBase.pushMetric(metricsRecord);
  }

  metricsRecord.update();
}
项目:RDFS    文件:MultiTaskTracker.java   
public TaskTrackerRunner(int id, TaskTracker tt) {
  super();
  this.ttToRun = tt;
  this.id = id;
}
项目:hanoi-hadoop-2.0.0-cdh    文件:TestTrackerDistributedCacheManager.java   
/**
   * This is the typical flow for using the DistributedCache classes.
   * 
   * @throws IOException
   * @throws LoginException
   */
  public void testManagerFlow() throws IOException, LoginException {
    if (!canRun()) {
      return;
    }

    // ****** Imitate JobClient code
    // Configures a task/job with both a regular file and a "classpath" file.
    Configuration subConf = new Configuration(conf);
    String userName = getJobOwnerName();
    subConf.set("user.name", userName);
    JobID jobid = new JobID("jt",1);
    DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
    DistributedCache.addFileToClassPath(secondCacheFile, subConf, 
                                        FileSystem.get(subConf));

    Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
    TrackerDistributedCacheManager.determineTimestamps(subConf, statCache);
    TrackerDistributedCacheManager.determineCacheVisibilities(subConf, statCache);
    assertEquals(2, statCache.size());
    // ****** End of imitating JobClient code

    Path jobFile = new Path(TEST_ROOT_DIR, "job.xml");
    FileOutputStream os = new FileOutputStream(new File(jobFile.toString()));
    subConf.writeXml(os);
    os.close();

    // ****** Imitate TaskRunner code.
    TrackerDistributedCacheManager manager = 
      new TrackerDistributedCacheManager(conf, taskController);
    TaskDistributedCacheManager handle =
      manager.newTaskDistributedCacheManager(jobid, subConf);
    assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
    File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
    handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(), 
        TaskTracker.getPrivateDistributedCacheDir(userName));
    JobLocalizer.downloadPrivateCache(subConf);
    // DOESN'T ACTUALLY HAPPEN IN THE TaskRunner (THIS IS A TODO)
//    handle.setupPrivateCache(localDirAllocator, TaskTracker
//        .getPrivateDistributedCacheDir(userName));
//    // ****** End of imitating TaskRunner code

    Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
    assertNotNull(null, localCacheFiles);
    assertEquals(2, localCacheFiles.length);
    Path cachedFirstFile = localCacheFiles[0];
    Path cachedSecondFile = localCacheFiles[1];
    assertFileLengthEquals(firstCacheFile, cachedFirstFile);
    assertFalse("Paths should be different.", 
        firstCacheFile.equals(cachedFirstFile));

    assertEquals(1, handle.getClassPaths().size());
    assertEquals(cachedSecondFile.toString(), handle.getClassPaths().get(0));

    checkFilePermissions(localCacheFiles);

    // Cleanup
    handle.release();
    manager.purgeCache();
    assertFalse(pathToFile(cachedFirstFile).exists());
  }
项目:hanoi-hadoop-2.0.0-cdh    文件:TestTrackerDistributedCacheManager.java   
public void testSameNameFileArchiveCache() throws IOException,
    InterruptedException {
  if (!canRun()) {
    return;
  }
  TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager(
      conf, taskController);
  String userName = getJobOwnerName();
  File workDir = new File(TEST_ROOT_DIR, "workdir");
  Path cacheFile = new Path(TEST_ROOT_DIR, "fileArchiveCacheFile");

  createPublicTempFile(cacheFile);
  Configuration conf1 = new Configuration(conf);
  conf1.set("user.name", userName);

  DistributedCache.addCacheFile(cacheFile.toUri(), conf1);
  DistributedCache.addCacheArchive(cacheFile.toUri(), conf1);
  TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf1);
  dumpState(conf1);

  TaskDistributedCacheManager handle = manager
      .newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
  handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(),
      TaskTracker.getPrivateDistributedCacheDir(userName));

  TaskDistributedCacheManager.CacheFile cFile = handle.getCacheFiles().get(0);
  TaskDistributedCacheManager.CacheFile cArchive = handle.getCacheFiles()
      .get(1);

  String distCacheDir = TaskTracker.getPublicDistributedCacheDir();

  Path localizedPathForFile = manager.getLocalCache(cacheFile.toUri(), conf1,
      distCacheDir, fs.getFileStatus(cacheFile), false, cFile.timestamp,
      true, cFile);

  Path localizedPathForArchive = manager.getLocalCache(cacheFile.toUri(),
      conf1, distCacheDir, fs.getFileStatus(cacheFile), true,
      cArchive.timestamp, true, cArchive);
  assertNotSame("File and Archive resolve to the same path: "
      + localizedPathForFile + ". Should differ.", localizedPathForFile,
      localizedPathForArchive);
}