Java 类org.apache.hadoop.mapred.ShuffleHandler 实例源码

项目:aliyun-oss-hadoop-fs    文件:TestMapreduceConfigFields.java   
@SuppressWarnings("deprecation")
 @Override
 public void initializeMemberVariables() {
   xmlFilename = new String("mapred-default.xml");
   configurationClasses = new Class[] { MRJobConfig.class, MRConfig.class,
       JHAdminConfig.class, ShuffleHandler.class, FileOutputFormat.class,
FileInputFormat.class, Job.class, NLineInputFormat.class,
JobConf.class, FileOutputCommitter.class };

   // Initialize used variables
   configurationPropsToSkipCompare = new HashSet<String>();

   // Set error modes
   errorIfMissingConfigProps = true;
   errorIfMissingXmlProps = false;

   // Ignore deprecated MR1 properties in JobConf
   configurationPropsToSkipCompare
           .add(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY);
   configurationPropsToSkipCompare
           .add(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY);
 }
项目:hops    文件:TestMapreduceConfigFields.java   
@Override
 public void initializeMemberVariables() {
   xmlFilename = new String("mapred-default.xml");
   configurationClasses = new Class[] { MRJobConfig.class, MRConfig.class,
       JHAdminConfig.class, ShuffleHandler.class, FileOutputFormat.class,
FileInputFormat.class, Job.class, NLineInputFormat.class,
JobConf.class, FileOutputCommitter.class };

   // Initialize used variables
   configurationPropsToSkipCompare = new HashSet<String>();
   xmlPropsToSkipCompare = new HashSet<String>();

   // Set error modes
   errorIfMissingConfigProps = true;
   errorIfMissingXmlProps = false;

   // Ignore deprecated MR1 properties in JobConf
   configurationPropsToSkipCompare
           .add(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY);
   configurationPropsToSkipCompare
           .add(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY);

   // Obsolete entries listed in MAPREDUCE-6057 were removed from trunk
   // but not removed from branch-2.
   xmlPropsToSkipCompare.add("map.sort.class");
   xmlPropsToSkipCompare.add("mapreduce.local.clientfactory.class.name");
   xmlPropsToSkipCompare.add("mapreduce.jobtracker.system.dir");
   xmlPropsToSkipCompare.add("mapreduce.jobtracker.staging.root.dir");
 }
项目:hadoop    文件:ContainerLauncherImpl.java   
@SuppressWarnings("unchecked")
public synchronized void launch(ContainerRemoteLaunchEvent event) {
  LOG.info("Launching " + taskAttemptID);
  if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
    state = ContainerState.DONE;
    sendContainerLaunchFailedMsg(taskAttemptID, 
        "Container was killed before it was launched");
    return;
  }

  ContainerManagementProtocolProxyData proxy = null;
  try {

    proxy = getCMProxy(containerMgrAddress, containerID);

    // Construct the actual Container
    ContainerLaunchContext containerLaunchContext =
      event.getContainerLaunchContext();

    // Now launch the actual container
    StartContainerRequest startRequest =
        StartContainerRequest.newInstance(containerLaunchContext,
          event.getContainerToken());
    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
    list.add(startRequest);
    StartContainersRequest requestList = StartContainersRequest.newInstance(list);
    StartContainersResponse response =
        proxy.getContainerManagementProtocol().startContainers(requestList);
    if (response.getFailedRequests() != null
        && response.getFailedRequests().containsKey(containerID)) {
      throw response.getFailedRequests().get(containerID).deSerialize();
    }
    ByteBuffer portInfo =
        response.getAllServicesMetaData().get(
            ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
    int port = -1;
    if(portInfo != null) {
      port = ShuffleHandler.deserializeMetaData(portInfo);
    }
    LOG.info("Shuffle port returned by ContainerManager for "
        + taskAttemptID + " : " + port);

    if(port < 0) {
      this.state = ContainerState.FAILED;
      throw new IllegalStateException("Invalid shuffle port number "
          + port + " returned for " + taskAttemptID);
    }

    // after launching, send launched event to task attempt to move
    // it from ASSIGNED to RUNNING state
    context.getEventHandler().handle(
        new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
    this.state = ContainerState.RUNNING;
  } catch (Throwable t) {
    String message = "Container launch failed for " + containerID + " : "
        + StringUtils.stringifyException(t);
    this.state = ContainerState.FAILED;
    sendContainerLaunchFailedMsg(taskAttemptID, message);
  } finally {
    if (proxy != null) {
      cmProxy.mayBeCloseProxy(proxy);
    }
  }
}
项目:hadoop    文件:TestContainerLauncherImpl.java   
@Before
public void setup() throws IOException {
  serviceResponse.clear();
  serviceResponse.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
      ShuffleHandler.serializeMetaData(80));
}
项目:hadoop    文件:MiniMRYarnCluster.java   
@Override
public void serviceInit(Configuration conf) throws Exception {
  conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
  if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
    conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
        "apps_staging_dir/").getAbsolutePath());
  }

  // By default, VMEM monitoring disabled, PMEM monitoring enabled.
  if (!conf.getBoolean(
      MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
      MRConfig.DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
  }

  conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");

  try {
    Path stagingPath = FileContext.getFileContext(conf).makeQualified(
        new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
    /*
     * Re-configure the staging path on Windows if the file system is localFs.
     * We need to use a absolute path that contains the drive letter. The unit
     * test could run on a different drive than the AM. We can run into the
     * issue that job files are localized to the drive where the test runs on,
     * while the AM starts on a different drive and fails to find the job
     * metafiles. Using absolute path can avoid this ambiguity.
     */
    if (Path.WINDOWS) {
      if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
        conf.set(MRJobConfig.MR_AM_STAGING_DIR,
            new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
                .getAbsolutePath());
      }
    }
    FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
    if (fc.util().exists(stagingPath)) {
      LOG.info(stagingPath + " exists! deleting...");
      fc.delete(stagingPath, true);
    }
    LOG.info("mkdir: " + stagingPath);
    //mkdir the staging directory so that right permissions are set while running as proxy user
    fc.mkdir(stagingPath, null, true);
    //mkdir done directory as well 
    String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
    Path doneDirPath = fc.makeQualified(new Path(doneDir));
    fc.mkdir(doneDirPath, null, true);
  } catch (IOException e) {
    throw new YarnRuntimeException("Could not create staging directory. ", e);
  }
  conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
                                           // which shuffle doesn't happen
  //configure the shuffle service in NM
  conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
      new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
  conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
      ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
      Service.class);

  // Non-standard shuffle port
  conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);

  conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
      DefaultContainerExecutor.class, ContainerExecutor.class);

  // TestMRJobs is for testing non-uberized operation only; see TestUberAM
  // for corresponding uberized tests.
  conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);

  super.serviceInit(conf);
}
项目:aliyun-oss-hadoop-fs    文件:ContainerLauncherImpl.java   
@SuppressWarnings("unchecked")
public synchronized void launch(ContainerRemoteLaunchEvent event) {
  LOG.info("Launching " + taskAttemptID);
  if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
    state = ContainerState.DONE;
    sendContainerLaunchFailedMsg(taskAttemptID, 
        "Container was killed before it was launched");
    return;
  }

  ContainerManagementProtocolProxyData proxy = null;
  try {

    proxy = getCMProxy(containerMgrAddress, containerID);

    // Construct the actual Container
    ContainerLaunchContext containerLaunchContext =
      event.getContainerLaunchContext();

    // Now launch the actual container
    StartContainerRequest startRequest =
        StartContainerRequest.newInstance(containerLaunchContext,
          event.getContainerToken());
    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
    list.add(startRequest);
    StartContainersRequest requestList = StartContainersRequest.newInstance(list);
    StartContainersResponse response =
        proxy.getContainerManagementProtocol().startContainers(requestList);
    if (response.getFailedRequests() != null
        && response.getFailedRequests().containsKey(containerID)) {
      throw response.getFailedRequests().get(containerID).deSerialize();
    }
    ByteBuffer portInfo =
        response.getAllServicesMetaData().get(
            ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
    int port = -1;
    if(portInfo != null) {
      port = ShuffleHandler.deserializeMetaData(portInfo);
    }
    LOG.info("Shuffle port returned by ContainerManager for "
        + taskAttemptID + " : " + port);

    if(port < 0) {
      this.state = ContainerState.FAILED;
      throw new IllegalStateException("Invalid shuffle port number "
          + port + " returned for " + taskAttemptID);
    }

    // after launching, send launched event to task attempt to move
    // it from ASSIGNED to RUNNING state
    context.getEventHandler().handle(
        new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
    this.state = ContainerState.RUNNING;
  } catch (Throwable t) {
    String message = "Container launch failed for " + containerID + " : "
        + StringUtils.stringifyException(t);
    this.state = ContainerState.FAILED;
    sendContainerLaunchFailedMsg(taskAttemptID, message);
  } finally {
    if (proxy != null) {
      cmProxy.mayBeCloseProxy(proxy);
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestContainerLauncherImpl.java   
@Before
public void setup() throws IOException {
  serviceResponse.clear();
  serviceResponse.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
      ShuffleHandler.serializeMetaData(80));
}
项目:aliyun-oss-hadoop-fs    文件:MiniMRYarnCluster.java   
@Override
public void serviceInit(Configuration conf) throws Exception {
  conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
  if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
    conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
        "apps_staging_dir/").getAbsolutePath());
  }

  // By default, VMEM monitoring disabled, PMEM monitoring enabled.
  if (!conf.getBoolean(
      MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
      MRConfig.DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
  }

  conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");

  try {
    Path stagingPath = FileContext.getFileContext(conf).makeQualified(
        new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
    /*
     * Re-configure the staging path on Windows if the file system is localFs.
     * We need to use a absolute path that contains the drive letter. The unit
     * test could run on a different drive than the AM. We can run into the
     * issue that job files are localized to the drive where the test runs on,
     * while the AM starts on a different drive and fails to find the job
     * metafiles. Using absolute path can avoid this ambiguity.
     */
    if (Path.WINDOWS) {
      if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
        conf.set(MRJobConfig.MR_AM_STAGING_DIR,
            new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
                .getAbsolutePath());
      }
    }
    FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
    if (fc.util().exists(stagingPath)) {
      LOG.info(stagingPath + " exists! deleting...");
      fc.delete(stagingPath, true);
    }
    LOG.info("mkdir: " + stagingPath);
    //mkdir the staging directory so that right permissions are set while running as proxy user
    fc.mkdir(stagingPath, null, true);
    //mkdir done directory as well 
    String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
    Path doneDirPath = fc.makeQualified(new Path(doneDir));
    fc.mkdir(doneDirPath, null, true);
  } catch (IOException e) {
    throw new YarnRuntimeException("Could not create staging directory. ", e);
  }
  conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
                                           // which shuffle doesn't happen
  //configure the shuffle service in NM
  conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
      new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
  conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
      ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
      Service.class);

  // Non-standard shuffle port
  conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);

  conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
      DefaultContainerExecutor.class, ContainerExecutor.class);

  // TestMRJobs is for testing non-uberized operation only; see TestUberAM
  // for corresponding uberized tests.
  conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);

  super.serviceInit(conf);
}
项目:big-c    文件:ContainerLauncherImpl.java   
@SuppressWarnings("unchecked")
public synchronized void launch(ContainerRemoteLaunchEvent event) {
  LOG.info("Launching " + taskAttemptID);
  if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
    state = ContainerState.DONE;
    sendContainerLaunchFailedMsg(taskAttemptID, 
        "Container was killed before it was launched");
    return;
  }

  ContainerManagementProtocolProxyData proxy = null;
  try {

    proxy = getCMProxy(containerMgrAddress, containerID);

    // Construct the actual Container
    ContainerLaunchContext containerLaunchContext =
      event.getContainerLaunchContext();

    // Now launch the actual container
    StartContainerRequest startRequest =
        StartContainerRequest.newInstance(containerLaunchContext,
          event.getContainerToken());
    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
    list.add(startRequest);
    StartContainersRequest requestList = StartContainersRequest.newInstance(list);
    StartContainersResponse response =
        proxy.getContainerManagementProtocol().startContainers(requestList);
    if (response.getFailedRequests() != null
        && response.getFailedRequests().containsKey(containerID)) {
      throw response.getFailedRequests().get(containerID).deSerialize();
    }
    ByteBuffer portInfo =
        response.getAllServicesMetaData().get(
            ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
    int port = -1;
    if(portInfo != null) {
      port = ShuffleHandler.deserializeMetaData(portInfo);
    }
    LOG.info("Shuffle port returned by ContainerManager for "
        + taskAttemptID + " : " + port);

    if(port < 0) {
      this.state = ContainerState.FAILED;
      throw new IllegalStateException("Invalid shuffle port number "
          + port + " returned for " + taskAttemptID);
    }

    // after launching, send launched event to task attempt to move
    // it from ASSIGNED to RUNNING state
    context.getEventHandler().handle(
        new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
    this.state = ContainerState.RUNNING;
  } catch (Throwable t) {
    String message = "Container launch failed for " + containerID + " : "
        + StringUtils.stringifyException(t);
    this.state = ContainerState.FAILED;
    sendContainerLaunchFailedMsg(taskAttemptID, message);
  } finally {
    if (proxy != null) {
      cmProxy.mayBeCloseProxy(proxy);
    }
  }
}
项目:big-c    文件:TestContainerLauncherImpl.java   
@Before
public void setup() throws IOException {
  serviceResponse.clear();
  serviceResponse.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
      ShuffleHandler.serializeMetaData(80));
}
项目:big-c    文件:MiniMRYarnCluster.java   
@Override
public void serviceInit(Configuration conf) throws Exception {
  conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
  if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
    conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
        "apps_staging_dir/").getAbsolutePath());
  }

  // By default, VMEM monitoring disabled, PMEM monitoring enabled.
  if (!conf.getBoolean(
      MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
      MRConfig.DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
  }

  conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");

  try {
    Path stagingPath = FileContext.getFileContext(conf).makeQualified(
        new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
    /*
     * Re-configure the staging path on Windows if the file system is localFs.
     * We need to use a absolute path that contains the drive letter. The unit
     * test could run on a different drive than the AM. We can run into the
     * issue that job files are localized to the drive where the test runs on,
     * while the AM starts on a different drive and fails to find the job
     * metafiles. Using absolute path can avoid this ambiguity.
     */
    if (Path.WINDOWS) {
      if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
        conf.set(MRJobConfig.MR_AM_STAGING_DIR,
            new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
                .getAbsolutePath());
      }
    }
    FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
    if (fc.util().exists(stagingPath)) {
      LOG.info(stagingPath + " exists! deleting...");
      fc.delete(stagingPath, true);
    }
    LOG.info("mkdir: " + stagingPath);
    //mkdir the staging directory so that right permissions are set while running as proxy user
    fc.mkdir(stagingPath, null, true);
    //mkdir done directory as well 
    String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
    Path doneDirPath = fc.makeQualified(new Path(doneDir));
    fc.mkdir(doneDirPath, null, true);
  } catch (IOException e) {
    throw new YarnRuntimeException("Could not create staging directory. ", e);
  }
  conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
                                           // which shuffle doesn't happen
  //configure the shuffle service in NM
  conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
      new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
  conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
      ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
      Service.class);

  // Non-standard shuffle port
  conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);

  conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
      DefaultContainerExecutor.class, ContainerExecutor.class);

  // TestMRJobs is for testing non-uberized operation only; see TestUberAM
  // for corresponding uberized tests.
  conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);

  super.serviceInit(conf);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:ContainerLauncherImpl.java   
@SuppressWarnings("unchecked")
public synchronized void launch(ContainerRemoteLaunchEvent event) {
  LOG.info("Launching " + taskAttemptID);
  if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
    state = ContainerState.DONE;
    sendContainerLaunchFailedMsg(taskAttemptID, 
        "Container was killed before it was launched");
    return;
  }

  ContainerManagementProtocolProxyData proxy = null;
  try {

    proxy = getCMProxy(containerMgrAddress, containerID);

    // Construct the actual Container
    ContainerLaunchContext containerLaunchContext =
      event.getContainerLaunchContext();

    // Now launch the actual container
    StartContainerRequest startRequest =
        StartContainerRequest.newInstance(containerLaunchContext,
          event.getContainerToken());
    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
    list.add(startRequest);
    StartContainersRequest requestList = StartContainersRequest.newInstance(list);
    StartContainersResponse response =
        proxy.getContainerManagementProtocol().startContainers(requestList);
    if (response.getFailedRequests() != null
        && response.getFailedRequests().containsKey(containerID)) {
      throw response.getFailedRequests().get(containerID).deSerialize();
    }
    ByteBuffer portInfo =
        response.getAllServicesMetaData().get(
            ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
    int port = -1;
    if(portInfo != null) {
      port = ShuffleHandler.deserializeMetaData(portInfo);
    }
    LOG.info("Shuffle port returned by ContainerManager for "
        + taskAttemptID + " : " + port);

    if(port < 0) {
      this.state = ContainerState.FAILED;
      throw new IllegalStateException("Invalid shuffle port number "
          + port + " returned for " + taskAttemptID);
    }

    // after launching, send launched event to task attempt to move
    // it from ASSIGNED to RUNNING state
    context.getEventHandler().handle(
        new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
    this.state = ContainerState.RUNNING;
  } catch (Throwable t) {
    String message = "Container launch failed for " + containerID + " : "
        + StringUtils.stringifyException(t);
    this.state = ContainerState.FAILED;
    sendContainerLaunchFailedMsg(taskAttemptID, message);
  } finally {
    if (proxy != null) {
      cmProxy.mayBeCloseProxy(proxy);
    }
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestContainerLauncherImpl.java   
@Before
public void setup() throws IOException {
  serviceResponse.clear();
  serviceResponse.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
      ShuffleHandler.serializeMetaData(80));
}
项目:hadoop-2.6.0-cdh5.4.3    文件:MiniMRYarnCluster.java   
@Override
public void serviceInit(Configuration conf) throws Exception {
  conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
  if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
    conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
        "apps_staging_dir/").getAbsolutePath());
  }

  // By default, VMEM monitoring disabled, PMEM monitoring enabled.
  if (!conf.getBoolean(
      MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
      MRConfig.DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
  }

  conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");

  try {
    Path stagingPath = FileContext.getFileContext(conf).makeQualified(
        new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
    /*
     * Re-configure the staging path on Windows if the file system is localFs.
     * We need to use a absolute path that contains the drive letter. The unit
     * test could run on a different drive than the AM. We can run into the
     * issue that job files are localized to the drive where the test runs on,
     * while the AM starts on a different drive and fails to find the job
     * metafiles. Using absolute path can avoid this ambiguity.
     */
    if (Path.WINDOWS) {
      if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
        conf.set(MRJobConfig.MR_AM_STAGING_DIR,
            new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
                .getAbsolutePath());
      }
    }
    FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
    if (fc.util().exists(stagingPath)) {
      LOG.info(stagingPath + " exists! deleting...");
      fc.delete(stagingPath, true);
    }
    LOG.info("mkdir: " + stagingPath);
    //mkdir the staging directory so that right permissions are set while running as proxy user
    fc.mkdir(stagingPath, null, true);
    //mkdir done directory as well 
    String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
    Path doneDirPath = fc.makeQualified(new Path(doneDir));
    fc.mkdir(doneDirPath, null, true);
  } catch (IOException e) {
    throw new YarnRuntimeException("Could not create staging directory. ", e);
  }
  conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
                                           // which shuffle doesn't happen
  //configure the shuffle service in NM
  conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
      new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
  conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
      ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
      Service.class);

  // Non-standard shuffle port
  conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);

  conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
      DefaultContainerExecutor.class, ContainerExecutor.class);

  // TestMRJobs is for testing non-uberized operation only; see TestUberAM
  // for corresponding uberized tests.
  conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);

  super.serviceInit(conf);
}
项目:hadoop-plus    文件:ContainerLauncherImpl.java   
@SuppressWarnings("unchecked")
public synchronized void launch(ContainerRemoteLaunchEvent event) {
  LOG.info("Launching " + taskAttemptID);
  if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
    state = ContainerState.DONE;
    sendContainerLaunchFailedMsg(taskAttemptID, 
        "Container was killed before it was launched");
    return;
  }

  ContainerManagementProtocolProxyData proxy = null;
  try {

    proxy = getCMProxy(containerMgrAddress, containerID);

    // Construct the actual Container
    ContainerLaunchContext containerLaunchContext =
      event.getContainerLaunchContext();
    // Now launch the actual container
    StartContainerRequest startRequest =
        StartContainerRequest.newInstance(containerLaunchContext,
          event.getContainerToken());
    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
    list.add(startRequest);
    StartContainersRequest requestList = StartContainersRequest.newInstance(list);
    StartContainersResponse response =
        proxy.getContainerManagementProtocol().startContainers(requestList);
    if (response.getFailedRequests() != null
        && response.getFailedRequests().containsKey(containerID)) {
      throw response.getFailedRequests().get(containerID).deSerialize();
    }
    ByteBuffer portInfo =
        response.getAllServicesMetaData().get(
            ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
    int port = -1;
    if(portInfo != null) {
      port = ShuffleHandler.deserializeMetaData(portInfo);
    }
    LOG.info("Shuffle port returned by ContainerManager for "
        + taskAttemptID + " : " + port);

    if(port < 0) {
      this.state = ContainerState.FAILED;
      throw new IllegalStateException("Invalid shuffle port number "
          + port + " returned for " + taskAttemptID);
    }

    // after launching, send launched event to task attempt to move
    // it from ASSIGNED to RUNNING state
    context.getEventHandler().handle(
        new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
    this.state = ContainerState.RUNNING;
  } catch (Throwable t) {
    String message = "Container launch failed for " + containerID + " : "
        + StringUtils.stringifyException(t);
    this.state = ContainerState.FAILED;
    sendContainerLaunchFailedMsg(taskAttemptID, message);
  } finally {
    if (proxy != null) {
      cmProxy.mayBeCloseProxy(proxy);
    }
  }
}
项目:hadoop-plus    文件:TestContainerLauncherImpl.java   
@Before
public void setup() throws IOException {
  serviceResponse.clear();
  serviceResponse.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
      ShuffleHandler.serializeMetaData(80));
}
项目:hadoop-plus    文件:MiniMRYarnCluster.java   
@Override
public void serviceInit(Configuration conf) throws Exception {
  conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
  if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
    conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
        "apps_staging_dir/").getAbsolutePath());
  }

  // By default, VMEM monitoring disabled, PMEM monitoring enabled.
  if (!conf.getBoolean(
      MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
      MRConfig.DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
  }

  conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");

  try {
    Path stagingPath = FileContext.getFileContext(conf).makeQualified(
        new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
    /*
     * Re-configure the staging path on Windows if the file system is localFs.
     * We need to use a absolute path that contains the drive letter. The unit
     * test could run on a different drive than the AM. We can run into the
     * issue that job files are localized to the drive where the test runs on,
     * while the AM starts on a different drive and fails to find the job
     * metafiles. Using absolute path can avoid this ambiguity.
     */
    if (Path.WINDOWS) {
      if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
        conf.set(MRJobConfig.MR_AM_STAGING_DIR,
            new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
                .getAbsolutePath());
      }
    }
    FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
    if (fc.util().exists(stagingPath)) {
      LOG.info(stagingPath + " exists! deleting...");
      fc.delete(stagingPath, true);
    }
    LOG.info("mkdir: " + stagingPath);
    //mkdir the staging directory so that right permissions are set while running as proxy user
    fc.mkdir(stagingPath, null, true);
    //mkdir done directory as well 
    String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
    Path doneDirPath = fc.makeQualified(new Path(doneDir));
    fc.mkdir(doneDirPath, null, true);
  } catch (IOException e) {
    throw new YarnRuntimeException("Could not create staging directory. ", e);
  }
  conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
                                           // which shuffle doesn't happen
  //configure the shuffle service in NM
  conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
      new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
  conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
      ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
      Service.class);

  // Non-standard shuffle port
  conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);

  conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
      DefaultContainerExecutor.class, ContainerExecutor.class);

  // TestMRJobs is for testing non-uberized operation only; see TestUberAM
  // for corresponding uberized tests.
  conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);

  super.serviceInit(conf);
}
项目:search    文件:MiniMRYarnCluster.java   
@Override
public void serviceInit(Configuration conf) throws Exception {
  conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
  if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
    conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
        "apps_staging_dir/").getAbsolutePath());
  }

  // By default, VMEM monitoring disabled, PMEM monitoring enabled.
  if (!conf.getBoolean(
      MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
      MRConfig.DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
  }

  conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");

  try {
    Path stagingPath = FileContext.getFileContext(conf).makeQualified(
        new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
    /*
     * Re-configure the staging path on Windows if the file system is localFs.
     * We need to use a absolute path that contains the drive letter. The unit
     * test could run on a different drive than the AM. We can run into the
     * issue that job files are localized to the drive where the test runs on,
     * while the AM starts on a different drive and fails to find the job
     * metafiles. Using absolute path can avoid this ambiguity.
     */
    if (Path.WINDOWS) {
      if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
        conf.set(MRJobConfig.MR_AM_STAGING_DIR,
            new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
                .getAbsolutePath());
      }
    }
    FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
    if (fc.util().exists(stagingPath)) {
      LOG.info(stagingPath + " exists! deleting...");
      fc.delete(stagingPath, true);
    }
    LOG.info("mkdir: " + stagingPath);
    //mkdir the staging directory so that right permissions are set while running as proxy user
    fc.mkdir(stagingPath, null, true);
    //mkdir done directory as well 
    String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
    Path doneDirPath = fc.makeQualified(new Path(doneDir));
    fc.mkdir(doneDirPath, null, true);
  } catch (IOException e) {
    throw new YarnRuntimeException("Could not create staging directory. ", e);
  }
  conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
                                           // which shuffle doesn't happen
  //configure the shuffle service in NM
  conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
      new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
  conf.setClass(String.format(Locale.ENGLISH, YarnConfiguration.NM_AUX_SERVICE_FMT,
      ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
      Service.class);

  // Non-standard shuffle port
  conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);

  conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
      DefaultContainerExecutor.class, ContainerExecutor.class);

  // TestMRJobs is for testing non-uberized operation only; see TestUberAM
  // for corresponding uberized tests.
  conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);

  super.serviceInit(conf);
}
项目:FlexMap    文件:ContainerLauncherImpl.java   
@SuppressWarnings("unchecked")
public synchronized void launch(ContainerRemoteLaunchEvent event) {
  LOG.info("Launching " + taskAttemptID);
  if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
    state = ContainerState.DONE;
    sendContainerLaunchFailedMsg(taskAttemptID, 
        "Container was killed before it was launched");
    return;
  }

  ContainerManagementProtocolProxyData proxy = null;
  try {

    proxy = getCMProxy(containerMgrAddress, containerID);

    // Construct the actual Container
    ContainerLaunchContext containerLaunchContext =
      event.getContainerLaunchContext();

    LOG.info("launch remote task on"+this.containerMgrAddress+"from"+this.taskAttemptID.toString());

    // Now launch the actual container
    StartContainerRequest startRequest =
        StartContainerRequest.newInstance(containerLaunchContext,
          event.getContainerToken());
    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
    list.add(startRequest);
    StartContainersRequest requestList = StartContainersRequest.newInstance(list);
    StartContainersResponse response =
        proxy.getContainerManagementProtocol().startContainers(requestList);

    if (response.getFailedRequests() != null
        && response.getFailedRequests().containsKey(containerID)) {
      throw response.getFailedRequests().get(containerID).deSerialize();
    }
    ByteBuffer portInfo =
        response.getAllServicesMetaData().get(
            ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
    int port = -1;
    if(portInfo != null) {
      port = ShuffleHandler.deserializeMetaData(portInfo);
    }
    LOG.info("Shuffle port returned by ContainerManager for "
        + taskAttemptID + " : " + port);

    if(port < 0) {
      this.state = ContainerState.FAILED;
      throw new IllegalStateException("Invalid shuffle port number "
          + port + " returned for " + taskAttemptID);
    }

    // after launching, send launched event to task attempt to move
    // it from ASSIGNED to RUNNING state
    context.getEventHandler().handle(
        new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
    this.state = ContainerState.RUNNING;
  } catch (Throwable t) {
    String message = "Container launch failed for " + containerID + " : "
        + StringUtils.stringifyException(t);
    this.state = ContainerState.FAILED;
    sendContainerLaunchFailedMsg(taskAttemptID, message);
  } finally {
    if (proxy != null) {
      cmProxy.mayBeCloseProxy(proxy);
    }
  }
}
项目:FlexMap    文件:TestContainerLauncherImpl.java   
@Before
public void setup() throws IOException {
  serviceResponse.clear();
  serviceResponse.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
      ShuffleHandler.serializeMetaData(80));
}
项目:FlexMap    文件:MiniMRYarnCluster.java   
@Override
public void serviceInit(Configuration conf) throws Exception {
  conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
  if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
    conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
        "apps_staging_dir/").getAbsolutePath());
  }

  // By default, VMEM monitoring disabled, PMEM monitoring enabled.
  if (!conf.getBoolean(
      MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
      MRConfig.DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
  }

  conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");

  try {
    Path stagingPath = FileContext.getFileContext(conf).makeQualified(
        new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
    /*
     * Re-configure the staging path on Windows if the file system is localFs.
     * We need to use a absolute path that contains the drive letter. The unit
     * test could run on a different drive than the AM. We can run into the
     * issue that job files are localized to the drive where the test runs on,
     * while the AM starts on a different drive and fails to find the job
     * metafiles. Using absolute path can avoid this ambiguity.
     */
    if (Path.WINDOWS) {
      if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
        conf.set(MRJobConfig.MR_AM_STAGING_DIR,
            new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
                .getAbsolutePath());
      }
    }
    FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
    if (fc.util().exists(stagingPath)) {
      LOG.info(stagingPath + " exists! deleting...");
      fc.delete(stagingPath, true);
    }
    LOG.info("mkdir: " + stagingPath);
    //mkdir the staging directory so that right permissions are set while running as proxy user
    fc.mkdir(stagingPath, null, true);
    //mkdir done directory as well 
    String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
    Path doneDirPath = fc.makeQualified(new Path(doneDir));
    fc.mkdir(doneDirPath, null, true);
  } catch (IOException e) {
    throw new YarnRuntimeException("Could not create staging directory. ", e);
  }
  conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
                                           // which shuffle doesn't happen
  //configure the shuffle service in NM
  conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
      new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
  conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
      ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
      Service.class);

  // Non-standard shuffle port
  conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);

  conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
      DefaultContainerExecutor.class, ContainerExecutor.class);

  // TestMRJobs is for testing non-uberized operation only; see TestUberAM
  // for corresponding uberized tests.
  conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);

  super.serviceInit(conf);
}
项目:hops    文件:ContainerLauncherImpl.java   
@SuppressWarnings("unchecked")
public synchronized void launch(ContainerRemoteLaunchEvent event) {
  LOG.info("Launching " + taskAttemptID);
  if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
    state = ContainerState.DONE;
    sendContainerLaunchFailedMsg(taskAttemptID, 
        "Container was killed before it was launched");
    return;
  }

  ContainerManagementProtocolProxyData proxy = null;
  try {

    proxy = getCMProxy(containerMgrAddress, containerID);

    // Construct the actual Container
    ContainerLaunchContext containerLaunchContext =
      event.getContainerLaunchContext();

    // Now launch the actual container
    StartContainerRequest startRequest =
        StartContainerRequest.newInstance(containerLaunchContext,
          event.getContainerToken());
    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
    list.add(startRequest);
    StartContainersRequest requestList = StartContainersRequest.newInstance(list);

    if (getConfig().getBoolean(CommonConfigurationKeysPublic
            .IPC_SERVER_SSL_ENABLED,
        CommonConfigurationKeysPublic.IPC_SERVER_SSL_ENABLED_DEFAULT)) {
      String user = UserGroupInformation.getCurrentUser().getUserName();
      setupCryptoMaterial(requestList, user);
    }

    StartContainersResponse response =
        proxy.getContainerManagementProtocol().startContainers(requestList);
    if (response.getFailedRequests() != null
        && response.getFailedRequests().containsKey(containerID)) {
      throw response.getFailedRequests().get(containerID).deSerialize();
    }
    ByteBuffer portInfo =
        response.getAllServicesMetaData().get(
            ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
    int port = -1;
    if(portInfo != null) {
      port = ShuffleHandler.deserializeMetaData(portInfo);
    }
    LOG.info("Shuffle port returned by ContainerManager for "
        + taskAttemptID + " : " + port);

    if(port < 0) {
      this.state = ContainerState.FAILED;
      throw new IllegalStateException("Invalid shuffle port number "
          + port + " returned for " + taskAttemptID);
    }

    // after launching, send launched event to task attempt to move
    // it from ASSIGNED to RUNNING state
    context.getEventHandler().handle(
        new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
    this.state = ContainerState.RUNNING;
  } catch (Throwable t) {
    String message = "Container launch failed for " + containerID + " : "
        + StringUtils.stringifyException(t);
    this.state = ContainerState.FAILED;
    sendContainerLaunchFailedMsg(taskAttemptID, message);
  } finally {
    if (proxy != null) {
      cmProxy.mayBeCloseProxy(proxy);
    }
  }
}
项目:hops    文件:TestContainerLauncherImpl.java   
@Before
public void setup() throws IOException {
  serviceResponse.clear();
  serviceResponse.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
      ShuffleHandler.serializeMetaData(80));
}
项目:hops    文件:MiniMRYarnCluster.java   
@Override
public void serviceInit(Configuration conf) throws Exception {
  conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
  if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
    conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
        "apps_staging_dir/").getAbsolutePath());
  }

  // By default, VMEM monitoring disabled, PMEM monitoring enabled.
  if (!conf.getBoolean(
      MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
      MRConfig.DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
  }

  conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");

  try {
    Path stagingPath = FileContext.getFileContext(conf).makeQualified(
        new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
    /*
     * Re-configure the staging path on Windows if the file system is localFs.
     * We need to use a absolute path that contains the drive letter. The unit
     * test could run on a different drive than the AM. We can run into the
     * issue that job files are localized to the drive where the test runs on,
     * while the AM starts on a different drive and fails to find the job
     * metafiles. Using absolute path can avoid this ambiguity.
     */
    if (Path.WINDOWS) {
      if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
        conf.set(MRJobConfig.MR_AM_STAGING_DIR,
            new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
                .getAbsolutePath());
      }
    }
    FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
    if (fc.util().exists(stagingPath)) {
      LOG.info(stagingPath + " exists! deleting...");
      fc.delete(stagingPath, true);
    }
    LOG.info("mkdir: " + stagingPath);
    //mkdir the staging directory so that right permissions are set while running as proxy user
    fc.mkdir(stagingPath, null, true);
    //mkdir done directory as well 
    String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
    Path doneDirPath = fc.makeQualified(new Path(doneDir));
    fc.mkdir(doneDirPath, null, true);
  } catch (IOException e) {
    throw new YarnRuntimeException("Could not create staging directory. ", e);
  }
  conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
                                           // which shuffle doesn't happen
  //configure the shuffle service in NM
  conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
      new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
  conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
      ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
      Service.class);

  // Non-standard shuffle port
  conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);

  conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
      DefaultContainerExecutor.class, ContainerExecutor.class);

  // TestMRJobs is for testing non-uberized operation only; see TestUberAM
  // for corresponding uberized tests.
  conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);

  super.serviceInit(conf);
}
项目:hadoop-TCP    文件:ContainerLauncherImpl.java   
@SuppressWarnings("unchecked")
public synchronized void launch(ContainerRemoteLaunchEvent event) {
  LOG.info("Launching " + taskAttemptID);
  if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
    state = ContainerState.DONE;
    sendContainerLaunchFailedMsg(taskAttemptID, 
        "Container was killed before it was launched");
    return;
  }

  ContainerManagementProtocolProxyData proxy = null;
  try {

    proxy = getCMProxy(containerMgrAddress, containerID);

    // Construct the actual Container
    ContainerLaunchContext containerLaunchContext =
      event.getContainerLaunchContext();

    // Now launch the actual container
    StartContainerRequest startRequest =
        StartContainerRequest.newInstance(containerLaunchContext,
          event.getContainerToken());
    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
    list.add(startRequest);
    StartContainersRequest requestList = StartContainersRequest.newInstance(list);
    StartContainersResponse response =
        proxy.getContainerManagementProtocol().startContainers(requestList);
    if (response.getFailedRequests() != null
        && response.getFailedRequests().containsKey(containerID)) {
      throw response.getFailedRequests().get(containerID).deSerialize();
    }
    ByteBuffer portInfo =
        response.getAllServicesMetaData().get(
            ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
    int port = -1;
    if(portInfo != null) {
      port = ShuffleHandler.deserializeMetaData(portInfo);
    }
    LOG.info("Shuffle port returned by ContainerManager for "
        + taskAttemptID + " : " + port);

    if(port < 0) {
      this.state = ContainerState.FAILED;
      throw new IllegalStateException("Invalid shuffle port number "
          + port + " returned for " + taskAttemptID);
    }

    // after launching, send launched event to task attempt to move
    // it from ASSIGNED to RUNNING state
    context.getEventHandler().handle(
        new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
    this.state = ContainerState.RUNNING;
  } catch (Throwable t) {
    String message = "Container launch failed for " + containerID + " : "
        + StringUtils.stringifyException(t);
    this.state = ContainerState.FAILED;
    sendContainerLaunchFailedMsg(taskAttemptID, message);
  } finally {
    if (proxy != null) {
      cmProxy.mayBeCloseProxy(proxy);
    }
  }
}
项目:hadoop-TCP    文件:TestContainerLauncherImpl.java   
@Before
public void setup() throws IOException {
  serviceResponse.clear();
  serviceResponse.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
      ShuffleHandler.serializeMetaData(80));
}
项目:hadoop-TCP    文件:MiniMRYarnCluster.java   
@Override
public void serviceInit(Configuration conf) throws Exception {
  conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
  if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
    conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
        "apps_staging_dir/").getAbsolutePath());
  }

  // By default, VMEM monitoring disabled, PMEM monitoring enabled.
  if (!conf.getBoolean(
      MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
      MRConfig.DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
  }

  conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");

  try {
    Path stagingPath = FileContext.getFileContext(conf).makeQualified(
        new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
    /*
     * Re-configure the staging path on Windows if the file system is localFs.
     * We need to use a absolute path that contains the drive letter. The unit
     * test could run on a different drive than the AM. We can run into the
     * issue that job files are localized to the drive where the test runs on,
     * while the AM starts on a different drive and fails to find the job
     * metafiles. Using absolute path can avoid this ambiguity.
     */
    if (Path.WINDOWS) {
      if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
        conf.set(MRJobConfig.MR_AM_STAGING_DIR,
            new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
                .getAbsolutePath());
      }
    }
    FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
    if (fc.util().exists(stagingPath)) {
      LOG.info(stagingPath + " exists! deleting...");
      fc.delete(stagingPath, true);
    }
    LOG.info("mkdir: " + stagingPath);
    //mkdir the staging directory so that right permissions are set while running as proxy user
    fc.mkdir(stagingPath, null, true);
    //mkdir done directory as well 
    String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
    Path doneDirPath = fc.makeQualified(new Path(doneDir));
    fc.mkdir(doneDirPath, null, true);
  } catch (IOException e) {
    throw new YarnRuntimeException("Could not create staging directory. ", e);
  }
  conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
                                           // which shuffle doesn't happen
  //configure the shuffle service in NM
  conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
      new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
  conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
      ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
      Service.class);

  // Non-standard shuffle port
  conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);

  conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
      DefaultContainerExecutor.class, ContainerExecutor.class);

  // TestMRJobs is for testing non-uberized operation only; see TestUberAM
  // for corresponding uberized tests.
  conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);

  super.serviceInit(conf);
}
项目:hardfs    文件:ContainerLauncherImpl.java   
@SuppressWarnings("unchecked")
public synchronized void launch(ContainerRemoteLaunchEvent event) {
  LOG.info("Launching " + taskAttemptID);
  if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
    state = ContainerState.DONE;
    sendContainerLaunchFailedMsg(taskAttemptID, 
        "Container was killed before it was launched");
    return;
  }

  ContainerManagementProtocolProxyData proxy = null;
  try {

    proxy = getCMProxy(containerMgrAddress, containerID);

    // Construct the actual Container
    ContainerLaunchContext containerLaunchContext =
      event.getContainerLaunchContext();

    // Now launch the actual container
    StartContainerRequest startRequest =
        StartContainerRequest.newInstance(containerLaunchContext,
          event.getContainerToken());
    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
    list.add(startRequest);
    StartContainersRequest requestList = StartContainersRequest.newInstance(list);
    StartContainersResponse response =
        proxy.getContainerManagementProtocol().startContainers(requestList);
    if (response.getFailedRequests() != null
        && response.getFailedRequests().containsKey(containerID)) {
      throw response.getFailedRequests().get(containerID).deSerialize();
    }
    ByteBuffer portInfo =
        response.getAllServicesMetaData().get(
            ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
    int port = -1;
    if(portInfo != null) {
      port = ShuffleHandler.deserializeMetaData(portInfo);
    }
    LOG.info("Shuffle port returned by ContainerManager for "
        + taskAttemptID + " : " + port);

    if(port < 0) {
      this.state = ContainerState.FAILED;
      throw new IllegalStateException("Invalid shuffle port number "
          + port + " returned for " + taskAttemptID);
    }

    // after launching, send launched event to task attempt to move
    // it from ASSIGNED to RUNNING state
    context.getEventHandler().handle(
        new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
    this.state = ContainerState.RUNNING;
  } catch (Throwable t) {
    String message = "Container launch failed for " + containerID + " : "
        + StringUtils.stringifyException(t);
    this.state = ContainerState.FAILED;
    sendContainerLaunchFailedMsg(taskAttemptID, message);
  } finally {
    if (proxy != null) {
      cmProxy.mayBeCloseProxy(proxy);
    }
  }
}
项目:hardfs    文件:TestContainerLauncherImpl.java   
@Before
public void setup() throws IOException {
  serviceResponse.clear();
  serviceResponse.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
      ShuffleHandler.serializeMetaData(80));
}
项目:hardfs    文件:MiniMRYarnCluster.java   
@Override
public void serviceInit(Configuration conf) throws Exception {
  conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
  if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
    conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
        "apps_staging_dir/").getAbsolutePath());
  }

  // By default, VMEM monitoring disabled, PMEM monitoring enabled.
  if (!conf.getBoolean(
      MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
      MRConfig.DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
  }

  conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");

  try {
    Path stagingPath = FileContext.getFileContext(conf).makeQualified(
        new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
    /*
     * Re-configure the staging path on Windows if the file system is localFs.
     * We need to use a absolute path that contains the drive letter. The unit
     * test could run on a different drive than the AM. We can run into the
     * issue that job files are localized to the drive where the test runs on,
     * while the AM starts on a different drive and fails to find the job
     * metafiles. Using absolute path can avoid this ambiguity.
     */
    if (Path.WINDOWS) {
      if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
        conf.set(MRJobConfig.MR_AM_STAGING_DIR,
            new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
                .getAbsolutePath());
      }
    }
    FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
    if (fc.util().exists(stagingPath)) {
      LOG.info(stagingPath + " exists! deleting...");
      fc.delete(stagingPath, true);
    }
    LOG.info("mkdir: " + stagingPath);
    //mkdir the staging directory so that right permissions are set while running as proxy user
    fc.mkdir(stagingPath, null, true);
    //mkdir done directory as well 
    String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
    Path doneDirPath = fc.makeQualified(new Path(doneDir));
    fc.mkdir(doneDirPath, null, true);
  } catch (IOException e) {
    throw new YarnRuntimeException("Could not create staging directory. ", e);
  }
  conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
                                           // which shuffle doesn't happen
  //configure the shuffle service in NM
  conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
      new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
  conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
      ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
      Service.class);

  // Non-standard shuffle port
  conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);

  conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
      DefaultContainerExecutor.class, ContainerExecutor.class);

  // TestMRJobs is for testing non-uberized operation only; see TestUberAM
  // for corresponding uberized tests.
  conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);

  super.serviceInit(conf);
}
项目:hadoop-on-lustre2    文件:ContainerLauncherImpl.java   
@SuppressWarnings("unchecked")
public synchronized void launch(ContainerRemoteLaunchEvent event) {
  LOG.info("Launching " + taskAttemptID);
  if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
    state = ContainerState.DONE;
    sendContainerLaunchFailedMsg(taskAttemptID, 
        "Container was killed before it was launched");
    return;
  }

  ContainerManagementProtocolProxyData proxy = null;
  try {

    proxy = getCMProxy(containerMgrAddress, containerID);

    // Construct the actual Container
    ContainerLaunchContext containerLaunchContext =
      event.getContainerLaunchContext();

    // Now launch the actual container
    StartContainerRequest startRequest =
        StartContainerRequest.newInstance(containerLaunchContext,
          event.getContainerToken());
    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
    list.add(startRequest);
    StartContainersRequest requestList = StartContainersRequest.newInstance(list);
    StartContainersResponse response =
        proxy.getContainerManagementProtocol().startContainers(requestList);
    if (response.getFailedRequests() != null
        && response.getFailedRequests().containsKey(containerID)) {
      throw response.getFailedRequests().get(containerID).deSerialize();
    }
    ByteBuffer portInfo =
        response.getAllServicesMetaData().get(
            ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
    int port = -1;
    if(portInfo != null) {
      port = ShuffleHandler.deserializeMetaData(portInfo);
    }
    LOG.info("Shuffle port returned by ContainerManager for "
        + taskAttemptID + " : " + port);

    if(port < 0) {
      this.state = ContainerState.FAILED;
      throw new IllegalStateException("Invalid shuffle port number "
          + port + " returned for " + taskAttemptID);
    }

    // after launching, send launched event to task attempt to move
    // it from ASSIGNED to RUNNING state
    context.getEventHandler().handle(
        new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
    this.state = ContainerState.RUNNING;
  } catch (Throwable t) {
    String message = "Container launch failed for " + containerID + " : "
        + StringUtils.stringifyException(t);
    this.state = ContainerState.FAILED;
    sendContainerLaunchFailedMsg(taskAttemptID, message);
  } finally {
    if (proxy != null) {
      cmProxy.mayBeCloseProxy(proxy);
    }
  }
}
项目:hadoop-on-lustre2    文件:TestContainerLauncherImpl.java   
@Before
public void setup() throws IOException {
  serviceResponse.clear();
  serviceResponse.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
      ShuffleHandler.serializeMetaData(80));
}
项目:hadoop-on-lustre2    文件:MiniMRYarnCluster.java   
@Override
public void serviceInit(Configuration conf) throws Exception {
  conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
  if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
    conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
        "apps_staging_dir/").getAbsolutePath());
  }

  // By default, VMEM monitoring disabled, PMEM monitoring enabled.
  if (!conf.getBoolean(
      MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
      MRConfig.DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
  }

  conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");

  try {
    Path stagingPath = FileContext.getFileContext(conf).makeQualified(
        new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
    /*
     * Re-configure the staging path on Windows if the file system is localFs.
     * We need to use a absolute path that contains the drive letter. The unit
     * test could run on a different drive than the AM. We can run into the
     * issue that job files are localized to the drive where the test runs on,
     * while the AM starts on a different drive and fails to find the job
     * metafiles. Using absolute path can avoid this ambiguity.
     */
    if (Path.WINDOWS) {
      if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
        conf.set(MRJobConfig.MR_AM_STAGING_DIR,
            new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
                .getAbsolutePath());
      }
    }
    FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
    if (fc.util().exists(stagingPath)) {
      LOG.info(stagingPath + " exists! deleting...");
      fc.delete(stagingPath, true);
    }
    LOG.info("mkdir: " + stagingPath);
    //mkdir the staging directory so that right permissions are set while running as proxy user
    fc.mkdir(stagingPath, null, true);
    //mkdir done directory as well 
    String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
    Path doneDirPath = fc.makeQualified(new Path(doneDir));
    fc.mkdir(doneDirPath, null, true);
  } catch (IOException e) {
    throw new YarnRuntimeException("Could not create staging directory. ", e);
  }
  conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
                                           // which shuffle doesn't happen
  //configure the shuffle service in NM
  conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
      new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
  conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
      ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
      Service.class);

  // Non-standard shuffle port
  conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);

  conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
      DefaultContainerExecutor.class, ContainerExecutor.class);

  // TestMRJobs is for testing non-uberized operation only; see TestUberAM
  // for corresponding uberized tests.
  conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);

  super.serviceInit(conf);
}