Java 类org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId 实例源码

项目:aliyun-oss-hadoop-fs    文件:FileSystemTimelineWriter.java   
public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs,
    long ttl) {
  domainLogFD = null;
  summanyLogFDs = new HashMap<ApplicationAttemptId, EntityLogFD>();
  entityLogFDs = new HashMap<ApplicationAttemptId,
      HashMap<TimelineEntityGroupId, EntityLogFD>>();
  this.flushTimer =
      new Timer(LogFDsCache.class.getSimpleName() + "FlushTimer",
        true);
  this.flushTimerTask = new FlushTimerTask();
  this.flushTimer.schedule(flushTimerTask, flushIntervalSecs * 1000,
      flushIntervalSecs * 1000);

  this.cleanInActiveFDsTimer =
      new Timer(LogFDsCache.class.getSimpleName() +
        "cleanInActiveFDsTimer", true);
  this.cleanInActiveFDsTask = new CleanInActiveFDsTask();
  this.cleanInActiveFDsTimer.schedule(cleanInActiveFDsTask,
      cleanIntervalSecs * 1000, cleanIntervalSecs * 1000);
  this.ttl = ttl * 1000;
}
项目:aliyun-oss-hadoop-fs    文件:FileSystemTimelineWriter.java   
private void flushEntityFDMap(Map<ApplicationAttemptId, HashMap<
    TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException {
  if (!logFDs.isEmpty()) {
    for (Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
             EntityLogFD>> logFDMapEntry : logFDs.entrySet()) {
      HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap
          = logFDMapEntry.getValue();
      for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
          : logFDMap.entrySet()) {
        EntityLogFD logFD = logFDEntry.getValue();
        try {
          logFD.lock();
          logFD.flush();
        } finally {
          logFD.unlock();
        }
      }
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:FileSystemTimelineWriter.java   
private void cleanInActiveEntityFDsforMap(Map<ApplicationAttemptId,
    HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs,
    long currentTimeStamp) {
  if (!logFDs.isEmpty()) {
    for (Entry<ApplicationAttemptId, HashMap<
             TimelineEntityGroupId, EntityLogFD>> logFDMapEntry
            : logFDs.entrySet()) {
      HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap
          = logFDMapEntry.getValue();
      for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
          : logFDMap.entrySet()) {
        EntityLogFD logFD = logFDEntry.getValue();
        try {
          logFD.lock();
          if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) {
            logFD.close();
          }
        } finally {
          logFD.unlock();
        }
      }
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:FileSystemTimelineWriter.java   
private void closeEntityFDs(Map<ApplicationAttemptId,
    HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs) {
  try {
    entityTableLocker.lock();
    if (!logFDs.isEmpty()) {
      for (Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
               EntityLogFD>> logFDMapEntry : logFDs.entrySet()) {
        HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap
            = logFDMapEntry.getValue();
        for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
            : logFDMap.entrySet()) {
          EntityLogFD logFD = logFDEntry.getValue();
          try {
            logFD.lock();
            logFD.close();
          } finally {
            logFD.unlock();
          }
        }
      }
    }
  } finally {
    entityTableLocker.unlock();
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestTimelineEntityGroupId.java   
@Test
public void testTimelineEntityGroupId() {
  ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
  ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
  TimelineEntityGroupId group1 = TimelineEntityGroupId.newInstance(appId1, "1");
  TimelineEntityGroupId group2 = TimelineEntityGroupId.newInstance(appId1, "2");
  TimelineEntityGroupId group3 = TimelineEntityGroupId.newInstance(appId2, "1");
  TimelineEntityGroupId group4 = TimelineEntityGroupId.newInstance(appId1, "1");

  Assert.assertTrue(group1.equals(group4));
  Assert.assertFalse(group1.equals(group2));
  Assert.assertFalse(group1.equals(group3));

  Assert.assertTrue(group1.compareTo(group4) == 0);
  Assert.assertTrue(group1.compareTo(group2) < 0);
  Assert.assertTrue(group1.compareTo(group3) < 0);

  Assert.assertTrue(group1.hashCode() == group4.hashCode());
  Assert.assertFalse(group1.hashCode() == group2.hashCode());
  Assert.assertFalse(group1.hashCode() == group3.hashCode());

  Assert.assertEquals("timelineEntityGroupId_1234_1_1", group1.toString());
  Assert.assertEquals(TimelineEntityGroupId.fromString("timelineEntityGroupId_1234_1_1"), group1);
}
项目:hops    文件:FileSystemTimelineWriter.java   
public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs,
    long ttl, long timerTaskRetainTTL) {
  domainLogFD = null;
  summanyLogFDs = new HashMap<ApplicationAttemptId, EntityLogFD>();
  entityLogFDs = new HashMap<ApplicationAttemptId,
      HashMap<TimelineEntityGroupId, EntityLogFD>>();
  this.ttl = ttl * 1000;
  this.flushIntervalSecs = flushIntervalSecs;
  this.cleanIntervalSecs = cleanIntervalSecs;
  long timerTaskRetainTTLVar = timerTaskRetainTTL * 1000;
  if (timerTaskRetainTTLVar > this.ttl) {
    this.timerTaskRetainTTL = timerTaskRetainTTLVar;
  } else {
    this.timerTaskRetainTTL = this.ttl + 2 * 60 * 1000;
    LOG.warn("The specific " + YarnConfiguration
        .TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS + " : "
        + timerTaskRetainTTL + " is invalid, because it is less than or "
        + "equal to " + YarnConfiguration
        .TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS + " : " + ttl + ". Use "
        + YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS + " : "
        + ttl + " + 120s instead.");
  }
  ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  this.timerTasksMonitorReadLock = lock.readLock();
  this.timerTasksMonitorWriteLock = lock.writeLock();
}
项目:hops    文件:FileSystemTimelineWriter.java   
private void flushEntityFDMap(Map<ApplicationAttemptId, HashMap<
    TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException {
  if (!logFDs.isEmpty()) {
    for (Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
             EntityLogFD>> logFDMapEntry : logFDs.entrySet()) {
      HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap
          = logFDMapEntry.getValue();
      for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
          : logFDMap.entrySet()) {
        EntityLogFD logFD = logFDEntry.getValue();
        try {
          logFD.lock();
          logFD.flush();
        } finally {
          logFD.unlock();
        }
      }
    }
  }
}
项目:hops    文件:FileSystemTimelineWriter.java   
private void cleanInActiveEntityFDsforMap(Map<ApplicationAttemptId,
    HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs,
    long currentTimeStamp) {
  if (!logFDs.isEmpty()) {
    for (Entry<ApplicationAttemptId, HashMap<
             TimelineEntityGroupId, EntityLogFD>> logFDMapEntry
            : logFDs.entrySet()) {
      HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap
          = logFDMapEntry.getValue();
      for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
          : logFDMap.entrySet()) {
        EntityLogFD logFD = logFDEntry.getValue();
        try {
          logFD.lock();
          if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) {
            logFD.close();
          }
        } finally {
          logFD.unlock();
        }
      }
    }
  }
}
项目:hops    文件:FileSystemTimelineWriter.java   
private void closeEntityFDs(Map<ApplicationAttemptId,
    HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs) {
  try {
    entityTableLocker.lock();
    if (!logFDs.isEmpty()) {
      for (Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
               EntityLogFD>> logFDMapEntry : logFDs.entrySet()) {
        HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap
            = logFDMapEntry.getValue();
        for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
            : logFDMap.entrySet()) {
          EntityLogFD logFD = logFDEntry.getValue();
          try {
            logFD.lock();
            logFD.close();
          } finally {
            logFD.unlock();
          }
        }
      }
    }
  } finally {
    entityTableLocker.unlock();
  }
}
项目:hops    文件:TestTimelineEntityGroupId.java   
@Test
public void testTimelineEntityGroupId() {
  ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
  ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
  TimelineEntityGroupId group1 = TimelineEntityGroupId.newInstance(appId1, "1");
  TimelineEntityGroupId group2 = TimelineEntityGroupId.newInstance(appId1, "2");
  TimelineEntityGroupId group3 = TimelineEntityGroupId.newInstance(appId2, "1");
  TimelineEntityGroupId group4 = TimelineEntityGroupId.newInstance(appId1, "1");

  Assert.assertTrue(group1.equals(group4));
  Assert.assertFalse(group1.equals(group2));
  Assert.assertFalse(group1.equals(group3));

  Assert.assertTrue(group1.compareTo(group4) == 0);
  Assert.assertTrue(group1.compareTo(group2) < 0);
  Assert.assertTrue(group1.compareTo(group3) < 0);

  Assert.assertTrue(group1.hashCode() == group4.hashCode());
  Assert.assertFalse(group1.hashCode() == group2.hashCode());
  Assert.assertFalse(group1.hashCode() == group3.hashCode());

  Assert.assertEquals("timelineEntityGroupId_1234_1_1", group1.toString());
  Assert.assertEquals(TimelineEntityGroupId.fromString("timelineEntityGroupId_1234_1_1"), group1);
}
项目:hops    文件:EntityGroupFSTimelineStore.java   
synchronized void loadDetailLog(TimelineDataManager tdm,
    TimelineEntityGroupId groupId) throws IOException {
  List<LogInfo> removeList = new ArrayList<>();
  for (LogInfo log : detailLogs) {
    LOG.debug("Try refresh logs for {}", log.getFilename());
    // Only refresh the log that matches the cache id
    if (log.matchesGroupId(groupId)) {
      Path dirPath = getAppDirPath();
      if (fs.exists(log.getPath(dirPath))) {
        LOG.debug("Refresh logs for cache id {}", groupId);
        log.parseForStore(tdm, dirPath, isDone(),
            jsonFactory, objMapper, fs);
      } else {
        // The log may have been removed, remove the log
        removeList.add(log);
        LOG.info(
            "File {} no longer exists, removing it from log list",
            log.getPath(dirPath));
      }
    }
  }
  detailLogs.removeAll(removeList);
}
项目:hops    文件:EntityGroupFSTimelineStore.java   
private List<TimelineStore> getTimelineStoresFromCacheIds(
    Set<TimelineEntityGroupId> groupIds, String entityType,
    List<EntityCacheItem> cacheItems)
    throws IOException {
  List<TimelineStore> stores = new LinkedList<TimelineStore>();
  // For now we just handle one store in a context. We return the first
  // non-null storage for the group ids.
  for (TimelineEntityGroupId groupId : groupIds) {
    TimelineStore storeForId = getCachedStore(groupId, cacheItems);
    if (storeForId != null) {
      LOG.debug("Adding {} as a store for the query", storeForId.getName());
      stores.add(storeForId);
      metrics.incrGetEntityToDetailOps();
    }
  }
  if (stores.size() == 0) {
    LOG.debug("Using summary store for {}", entityType);
    stores.add(this.summaryStore);
    metrics.incrGetEntityToSummaryOps();
  }
  return stores;
}
项目:hops    文件:EntityGroupFSTimelineStore.java   
protected List<TimelineStore> getTimelineStoresForRead(String entityId,
    String entityType, List<EntityCacheItem> cacheItems)
    throws IOException {
  Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
  for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
    LOG.debug("Trying plugin {} for id {} and type {}",
        cacheIdPlugin.getClass().getName(), entityId, entityType);
    Set<TimelineEntityGroupId> idsFromPlugin
        = cacheIdPlugin.getTimelineEntityGroupId(entityId, entityType);
    if (idsFromPlugin == null) {
      LOG.debug("Plugin returned null " + cacheIdPlugin.getClass().getName());
    } else {
      LOG.debug("Plugin returned ids: " + idsFromPlugin);
    }

    if (idsFromPlugin != null) {
      groupIds.addAll(idsFromPlugin);
      LOG.debug("plugin {} returns a non-null value on query",
          cacheIdPlugin.getClass().getName());
    }
  }
  return getTimelineStoresFromCacheIds(groupIds, entityType, cacheItems);
}
项目:hops    文件:EntityGroupFSTimelineStore.java   
private List<TimelineStore> getTimelineStoresForRead(String entityType,
    NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
    List<EntityCacheItem> cacheItems) throws IOException {
  Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
  for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
    Set<TimelineEntityGroupId> idsFromPlugin =
        cacheIdPlugin.getTimelineEntityGroupId(entityType, primaryFilter,
            secondaryFilters);
    if (idsFromPlugin != null) {
      LOG.debug("plugin {} returns a non-null value on query {}",
          cacheIdPlugin.getClass().getName(), idsFromPlugin);
      groupIds.addAll(idsFromPlugin);
    }
  }
  return getTimelineStoresFromCacheIds(groupIds, entityType, cacheItems);
}
项目:tez    文件:ATSV15HistoryLoggingService.java   
private void logEntity(TimelineEntityGroupId groupId, TimelineEntity entity, String domainId) {
  if (historyACLPolicyManager != null && domainId != null && !domainId.isEmpty()) {
    historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId);
  }

  try {
    TimelinePutResponse response = timelineClient.putEntities(
        appContext.getApplicationAttemptId(), groupId, entity);
    if (response != null
        && !response.getErrors().isEmpty()) {
      int count = response.getErrors().size();
      for (int i = 0; i < count; ++i) {
        TimelinePutError err = response.getErrors().get(i);
        if (err.getErrorCode() != 0) {
          LOG.warn("Could not post history event to ATS"
              + ", atsPutError=" + err.getErrorCode()
              + ", entityId=" + err.getEntityId());
        }
      }
    }
    // Do nothing additional, ATS client library should handle throttling
    // or auto-disable as needed
  } catch (Exception e) {
    LOG.warn("Could not handle history events", e);
  }
}
项目:tez    文件:TimelineCachePluginImpl.java   
@Override
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
    SortedSet<String> entityIds, Set<String> eventTypes) {
  if (!knownEntityTypes.contains(entityType)
      || summaryEntityTypes.contains(entityType)
      || entityIds == null || entityIds.isEmpty()) {
    return null;
  }

  Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
  for (String entityId : entityIds) {
    Set<TimelineEntityGroupId> groupId = convertToTimelineEntityGroupIds(entityType, entityId);
    if (groupId != null) {
      groupIds.addAll(groupId);
    }
  }
  return groupIds;
}
项目:tez    文件:TestTimelineCachePluginImpl.java   
@Test
public void testGetTimelineEntityGroupIdByPrimaryFilter() {
  TimelineCachePluginImpl plugin = createPlugin(100, null);
  for (Entry<String, String> entry : typeIdMap1.entrySet()) {
    NameValuePair primaryFilter = new NameValuePair(entry.getKey(), entry.getValue());
    Assert.assertNull(plugin.getTimelineEntityGroupId(EntityTypes.TEZ_APPLICATION.name(),
        primaryFilter, null));
    Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getKey(), primaryFilter, null);
    if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) {
      Assert.assertNull(groupIds);
      continue;
    }
    Assert.assertEquals(2, groupIds.size());
    Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
    while (iter.hasNext()) {
      TimelineEntityGroupId groupId = iter.next();
      Assert.assertEquals(appId1, groupId.getApplicationId());
      Assert.assertTrue(getGroupIds(dagID1, 100).contains(groupId.getTimelineEntityGroupId()));
    }
  }
}
项目:tez    文件:TestTimelineCachePluginImpl.java   
@Test
public void testGetTimelineEntityGroupIdByIdDefaultConfig() {
  TimelineCachePluginImpl plugin = createPlugin(-1, null);
  for (Entry<String, String> entry : typeIdMap1.entrySet()) {
    Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey());
    if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) {
      Assert.assertNull(groupIds);
      continue;
    }
    Assert.assertEquals(1, groupIds.size());
    Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
    while (iter.hasNext()) {
      TimelineEntityGroupId groupId = iter.next();
      Assert.assertEquals(appId1, groupId.getApplicationId());
      Assert.assertTrue(getGroupIds(dagID1).contains(groupId.getTimelineEntityGroupId()));
    }
  }
}
项目:tez    文件:TestTimelineCachePluginImpl.java   
@Test
public void testGetTimelineEntityGroupIdByIdNoGroupingConf() {
  TimelineCachePluginImpl plugin = createPlugin(1, null);
  for (Entry<String, String> entry : typeIdMap1.entrySet()) {
    Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey());
    if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) {
      Assert.assertNull(groupIds);
      continue;
    }
    Assert.assertEquals(1, groupIds.size());
    Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
    while (iter.hasNext()) {
      TimelineEntityGroupId groupId = iter.next();
      Assert.assertEquals(appId1, groupId.getApplicationId());
      Assert.assertTrue(getGroupIds(dagID1).contains(groupId.getTimelineEntityGroupId()));
    }
  }
}
项目:tez    文件:TestTimelineCachePluginImpl.java   
@Test
public void testGetTimelineEntityGroupIdById() {
  TimelineCachePluginImpl plugin = createPlugin(100, null);
  for (Entry<String, String> entry : typeIdMap1.entrySet()) {
    Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey());
    if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) {
      Assert.assertNull(groupIds);
      continue;
    }
    Assert.assertEquals(2, groupIds.size());
    Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
    while (iter.hasNext()) {
      TimelineEntityGroupId groupId = iter.next();
      Assert.assertEquals(appId1, groupId.getApplicationId());
      Assert.assertTrue(getGroupIds(dagID1, 100).contains(groupId.getTimelineEntityGroupId()));
    }
  }
}
项目:tez    文件:TestTimelineCachePluginImpl.java   
@Test
public void testGetTimelineEntityGroupIdByIdWithOldGroupIdsSingle() {
  TimelineCachePluginImpl plugin = createPlugin(100, "50");
  for (Entry<String, String> entry : typeIdMap2.entrySet()) {
    Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey());
    if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) {
      Assert.assertNull(groupIds);
      continue;
    }
    Assert.assertEquals(3, groupIds.size());
    Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
    while (iter.hasNext()) {
      TimelineEntityGroupId groupId = iter.next();
      Assert.assertEquals(appId2, groupId.getApplicationId());
      Assert.assertTrue(getGroupIds(dagID2, 100, 50).contains(groupId.getTimelineEntityGroupId()));
    }
  }
}
项目:tez    文件:TestTimelineCachePluginImpl.java   
@Test
public void testGetTimelineEntityGroupIdByIdWithOldGroupIdsMultiple() {
  TimelineCachePluginImpl plugin = createPlugin(100, "25, 50");
  for (Entry<String, String> entry : typeIdMap2.entrySet()) {
    Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey());
    if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) {
      Assert.assertNull(groupIds);
      continue;
    }
    Assert.assertEquals(4, groupIds.size());
    Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
    while (iter.hasNext()) {
      TimelineEntityGroupId groupId = iter.next();
      Assert.assertEquals(appId2, groupId.getApplicationId());
      Assert.assertTrue(
          getGroupIds(dagID2, 100, 25, 50).contains(groupId.getTimelineEntityGroupId()));
    }
  }
}
项目:tez    文件:TestTimelineCachePluginImpl.java   
@Test
public void testGetTimelineEntityGroupIdByIdWithOldGroupIdsEmpty() {
  TimelineCachePluginImpl plugin = createPlugin(100, "");
  for (Entry<String, String> entry : typeIdMap2.entrySet()) {
    Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey());
    if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) {
      Assert.assertNull(groupIds);
      continue;
    }
    Assert.assertEquals(2, groupIds.size());
    Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
    while (iter.hasNext()) {
      TimelineEntityGroupId groupId = iter.next();
      Assert.assertEquals(appId2, groupId.getApplicationId());
      Assert.assertTrue(getGroupIds(dagID2, 100).contains(groupId.getTimelineEntityGroupId()));
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:FileSystemTimelineWriter.java   
private Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
    EntityLogFD>> copyEntityLogFDs(Map<ApplicationAttemptId,
    HashMap<TimelineEntityGroupId, EntityLogFD>> entityLogFDsToCopy) {
  try {
    entityTableCopyLocker.lock();
    return new HashMap<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
        EntityLogFD>>(entityLogFDsToCopy);
  } finally {
    entityTableCopyLocker.unlock();
  }
}
项目:aliyun-oss-hadoop-fs    文件:FileSystemTimelineWriter.java   
public void writeEntityLogs(FileSystem fs, Path entityLogPath,
    ObjectMapper objMapper, ApplicationAttemptId appAttemptId,
    TimelineEntityGroupId groupId, List<TimelineEntity> entitiesToEntity,
    boolean isAppendSupported) throws IOException{
  writeEntityLogs(fs, entityLogPath, objMapper, appAttemptId,
      groupId, entitiesToEntity, isAppendSupported, this.entityLogFDs);
}
项目:aliyun-oss-hadoop-fs    文件:FileSystemTimelineWriter.java   
private void writeEntityLogs(FileSystem fs, Path logPath,
    ObjectMapper objMapper, ApplicationAttemptId attemptId,
    TimelineEntityGroupId groupId, List<TimelineEntity> entities,
    boolean isAppendSupported, Map<ApplicationAttemptId, HashMap<
        TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException {
  HashMap<TimelineEntityGroupId, EntityLogFD>logMapFD
      = logFDs.get(attemptId);
  if (logMapFD != null) {
    EntityLogFD logFD = logMapFD.get(groupId);
    if (logFD != null) {
      try {
        logFD.lock();
        if (serviceStopped) {
          return;
        }
        logFD.writeEntities(entities);
      } finally {
        logFD.unlock();
      }
    } else {
      createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId,
          entities, isAppendSupported, logFDs);
    }
  } else {
    createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId,
        entities, isAppendSupported, logFDs);
  }
}
项目:aliyun-oss-hadoop-fs    文件:FileSystemTimelineWriter.java   
private void createEntityFDandWrite(FileSystem fs, Path logPath,
    ObjectMapper objMapper, ApplicationAttemptId attemptId,
    TimelineEntityGroupId groupId, List<TimelineEntity> entities,
    boolean isAppendSupported, Map<ApplicationAttemptId, HashMap<
        TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException{
  try {
    entityTableLocker.lock();
    if (serviceStopped) {
      return;
    }
    HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap =
        logFDs.get(attemptId);
    if (logFDMap == null) {
      logFDMap = new HashMap<TimelineEntityGroupId, EntityLogFD>();
    }
    EntityLogFD logFD = logFDMap.get(groupId);
    if (logFD == null) {
      logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
    }
    try {
      logFD.lock();
      logFD.writeEntities(entities);
      try {
        entityTableCopyLocker.lock();
        logFDMap.put(groupId, logFD);
        logFDs.put(attemptId, logFDMap);
      } finally {
        entityTableCopyLocker.unlock();
      }
    } finally {
      logFD.unlock();
    }
  } finally {
    entityTableLocker.unlock();
  }
}
项目:aliyun-oss-hadoop-fs    文件:TimelineClientImpl.java   
@Override
public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
    TimelineEntityGroupId groupId, TimelineEntity... entities)
    throws IOException, YarnException {
  if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) {
    throw new YarnException(
      "This API is not supported under current Timeline Service Version: "
          + timelineServiceVersion);
  }

  return timelineWriter.putEntities(appAttemptId, groupId, entities);
}
项目:hops    文件:FileSystemTimelineWriter.java   
private Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
    EntityLogFD>> copyEntityLogFDs(Map<ApplicationAttemptId,
    HashMap<TimelineEntityGroupId, EntityLogFD>> entityLogFDsToCopy) {
  try {
    entityTableCopyLocker.lock();
    return new HashMap<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
        EntityLogFD>>(entityLogFDsToCopy);
  } finally {
    entityTableCopyLocker.unlock();
  }
}
项目:hops    文件:FileSystemTimelineWriter.java   
public void writeEntityLogs(FileSystem fs, Path entityLogPath,
    ObjectMapper objMapper, ApplicationAttemptId appAttemptId,
    TimelineEntityGroupId groupId, List<TimelineEntity> entitiesToEntity,
    boolean isAppendSupported) throws IOException{
  checkAndStartTimeTasks();
  writeEntityLogs(fs, entityLogPath, objMapper, appAttemptId,
      groupId, entitiesToEntity, isAppendSupported, this.entityLogFDs);
}
项目:hops    文件:FileSystemTimelineWriter.java   
private void writeEntityLogs(FileSystem fs, Path logPath,
    ObjectMapper objMapper, ApplicationAttemptId attemptId,
    TimelineEntityGroupId groupId, List<TimelineEntity> entities,
    boolean isAppendSupported, Map<ApplicationAttemptId, HashMap<
        TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException {
  HashMap<TimelineEntityGroupId, EntityLogFD>logMapFD
      = logFDs.get(attemptId);
  if (logMapFD != null) {
    EntityLogFD logFD = logMapFD.get(groupId);
    if (logFD != null) {
      try {
        logFD.lock();
        if (serviceStopped) {
          return;
        }
        logFD.writeEntities(entities);
      } finally {
        logFD.unlock();
      }
    } else {
      createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId,
          entities, isAppendSupported, logFDs);
    }
  } else {
    createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId,
        entities, isAppendSupported, logFDs);
  }
}
项目:hops    文件:FileSystemTimelineWriter.java   
private void createEntityFDandWrite(FileSystem fs, Path logPath,
    ObjectMapper objMapper, ApplicationAttemptId attemptId,
    TimelineEntityGroupId groupId, List<TimelineEntity> entities,
    boolean isAppendSupported, Map<ApplicationAttemptId, HashMap<
        TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException{
  try {
    entityTableLocker.lock();
    if (serviceStopped) {
      return;
    }
    HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap =
        logFDs.get(attemptId);
    if (logFDMap == null) {
      logFDMap = new HashMap<TimelineEntityGroupId, EntityLogFD>();
    }
    EntityLogFD logFD = logFDMap.get(groupId);
    if (logFD == null) {
      logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
    }
    try {
      logFD.lock();
      logFD.writeEntities(entities);
      try {
        entityTableCopyLocker.lock();
        logFDMap.put(groupId, logFD);
        logFDs.put(attemptId, logFDMap);
      } finally {
        entityTableCopyLocker.unlock();
      }
    } finally {
      logFD.unlock();
    }
  } finally {
    entityTableLocker.unlock();
  }
}
项目:hops    文件:TimelineClientImpl.java   
@Override
public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
    TimelineEntityGroupId groupId, TimelineEntity... entities)
    throws IOException, YarnException {
  if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) {
    throw new YarnException(
      "This API is not supported under current Timeline Service Version: "
          + timelineServiceVersion);
  }

  return timelineWriter.putEntities(appAttemptId, groupId, entities);
}
项目:hops    文件:EntityGroupPlugInForTest.java   
@Override
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
    NameValuePair primaryFilter,
    Collection<NameValuePair> secondaryFilters) {
  ApplicationId appId = ApplicationId.fromString(
      primaryFilter.getValue().toString());
  return Sets.newHashSet(getStandardTimelineGroupId(appId));
}
项目:hops    文件:EntityGroupPlugInForTest.java   
@Override
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityId,
    String entityType) {
  ApplicationId appId = ApplicationId.fromString(
      entityId);
  return Sets.newHashSet(getStandardTimelineGroupId(appId));
}
项目:hops    文件:DistributedShellTimelinePlugin.java   
@Override
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
    NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters) {
  if (ApplicationMaster.DSEntity.DS_CONTAINER.toString().equals(entityType)) {
    if (primaryFilter == null) {
      return null;
    }
    return toEntityGroupId(primaryFilter.getValue().toString());
  }
  return null;
}
项目:hops    文件:DistributedShellTimelinePlugin.java   
@Override
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityId,
    String entityType) {
  if (ApplicationMaster.DSEntity.DS_CONTAINER.toString().equals(entityId)) {
    ContainerId containerId = ContainerId.fromString(entityId);
    ApplicationId appId = containerId.getApplicationAttemptId()
        .getApplicationId();
    return toEntityGroupId(appId.toString());
  }
  return null;
}
项目:hops    文件:DistributedShellTimelinePlugin.java   
private Set<TimelineEntityGroupId> toEntityGroupId(String strAppId) {
  ApplicationId appId = ApplicationId.fromString(strAppId);
  TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance(
      appId, ApplicationMaster.CONTAINER_ENTITY_GROUP_ID);
  Set<TimelineEntityGroupId> result = new HashSet<>();
  result.add(groupId);
  return result;
}
项目:hops    文件:ApplicationMaster.java   
private TimelinePutResponse putContainerEntity(
    TimelineClient timelineClient, ApplicationAttemptId currAttemptId,
    TimelineEntity entity)
    throws YarnException, IOException {
  if (TimelineUtils.timelineServiceV1_5Enabled(conf)) {
    TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance(
        currAttemptId.getApplicationId(),
        CONTAINER_ENTITY_GROUP_ID);
    return timelineClient.putEntities(currAttemptId, groupId, entity);
  } else {
    return timelineClient.putEntities(entity);
  }
}
项目:tez    文件:ATSV15HistoryLoggingService.java   
@VisibleForTesting
public TimelineEntityGroupId getGroupId(DAGHistoryEvent event) {
  // Changing this function will impact TimelineCachePluginImpl and should be done very
  // carefully to account for handling different versions of Tez
  switch (event.getHistoryEvent().getEventType()) {
    case DAG_SUBMITTED:
    case DAG_INITIALIZED:
    case DAG_STARTED:
    case DAG_FINISHED:
    case DAG_KILL_REQUEST:
    case VERTEX_INITIALIZED:
    case VERTEX_STARTED:
    case VERTEX_CONFIGURE_DONE:
    case VERTEX_FINISHED:
    case TASK_STARTED:
    case TASK_FINISHED:
    case TASK_ATTEMPT_STARTED:
    case TASK_ATTEMPT_FINISHED:
    case DAG_COMMIT_STARTED:
    case VERTEX_COMMIT_STARTED:
    case VERTEX_GROUP_COMMIT_STARTED:
    case VERTEX_GROUP_COMMIT_FINISHED:
    case DAG_RECOVERED:
      String entityGroupId = numDagsPerGroup > 1
          ? event.getDagID().getGroupId(numDagsPerGroup)
          : event.getDagID().toString();
      return TimelineEntityGroupId.newInstance(event.getDagID().getApplicationId(), entityGroupId);
    case APP_LAUNCHED:
    case AM_LAUNCHED:
    case AM_STARTED:
    case CONTAINER_LAUNCHED:
    case CONTAINER_STOPPED:
      return TimelineEntityGroupId.newInstance(appContext.getApplicationID(),
          appContext.getApplicationID().toString());
  }
  return null;
}