tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject tez git commit: TEZ-3357. Change TimelineCachePlugin to handle DAG grouping. (Harish Jaiprakash via hitesh)
Date Thu, 21 Jul 2016 18:30:37 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.8 48f9f4c2a -> 98e352627


TEZ-3357. Change TimelineCachePlugin to handle DAG grouping. (Harish Jaiprakash via hitesh)

(cherry picked from commit 9930011b0f05d56ece049867a71fb7eebfe6442e)

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/98e35262
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/98e35262
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/98e35262

Branch: refs/heads/branch-0.8
Commit: 98e352627aec1d21c06d082abc22c8cd2a7ee470
Parents: 48f9f4c
Author: Hitesh Shah <hitesh@apache.org>
Authored: Thu Jul 21 11:23:33 2016 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Thu Jul 21 11:25:39 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/api/TezConfiguration.java    |  34 +++-
 .../org/apache/tez/dag/records/TezDAGID.java    |  19 +++
 .../logging/ats/TimelineCachePluginImpl.java    | 110 ++++++++----
 .../ats/TestTimelineCachePluginImpl.java        | 170 ++++++++++++++++---
 5 files changed, 273 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/98e35262/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 098b525..ce6440d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3357. Change TimelineCachePlugin to handle DAG grouping.
   TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat.
   TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used.
   TEZ-3329. Tez ATS data is incomplete for a vertex which fails or gets killed before initialization.

http://git-wip-us.apache.org/repos/asf/tez/blob/98e35262/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 1de2eda..11c50cf 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1229,6 +1229,38 @@ public class TezConfiguration extends Configuration {
       "org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService";
 
   /**
+   * Comma separated list of Integers. These are the values that were set for the config
value
+   * for {@value #TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP}. The older values are
required so that
+   * the groupIds generated previously will continue to be generated by the plugin. If an
older
+   * value is not present then the UI may not show information for DAGs which were created
+   * with a different grouping value.
+   *
+   * Note: Do not add too many values here as it will affect the performance of Yarn Timeline
+   * Server/Tez UI due to the need to scan for more log files.
+   */
+  @Private
+  @Unstable
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty
+  public static final String TEZ_HISTORY_LOGGING_TIMELINE_CACHE_PLUGIN_OLD_NUM_DAGS_PER_GROUP
=
+      TEZ_PREFIX + "history.logging.timeline-cache-plugin.old-num-dags-per-group";
+
+  /**
+   * Integer value. Number of DAGs to be grouped together. This is used by the history logging
+   * service to generate groupIds such that numDagsPerGroup will have same groupId in a given
+   * session. If the value is set to 1 then we disable grouping. This config is used to control
the
+   * number of DAGs written into one log file, and hence controls number of files created
in
+   * the Filesystem used by YARN Timeline.
+   */
+  @Private
+  @Unstable
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="integer")
+  public static final String TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP =
+      TEZ_PREFIX + "history.timeline.num-dags-per-group";
+  public static final int TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP_DEFAULT = 1;
+
+  /**
    * String value. The directory into which history data will be written. This defaults to
the 
    * container logging directory. This is relevant only when SimpleHistoryLoggingService
is being
    * used for {@link TezConfiguration#TEZ_HISTORY_LOGGING_SERVICE_CLASS}
@@ -1237,7 +1269,7 @@ public class TezConfiguration extends Configuration {
   @ConfigurationProperty
   public static final String TEZ_SIMPLE_HISTORY_LOGGING_DIR =
       TEZ_PREFIX + "simple.history.logging.dir";
-  
+
   /**
    * Int value. Maximum errors allowed while logging history data. After crossing this limit
history
    * logging gets disabled. The job continues to run after this.

http://git-wip-us.apache.org/repos/asf/tez/blob/98e35262/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
index 3828890..58ab509 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
@@ -176,6 +176,25 @@ public class TezDAGID extends TezID {
     return appendTo(new StringBuilder(DAG)).toString();
   }
 
+  // The groupId prefix.
+  private static final String DAG_GROUPID_PREFIX = "daggroup";
+
+  /**
+   * Generate a DAG group id which groups multiple DAGs into one group.
+   *
+   * @param numDagsPerGroup The number of DAGs present in one group.
+   * @return The group id to be used for grouping numDagsPerGroup into one group.
+   */
+  public String getGroupId(int numDagsPerGroup) {
+    if (numDagsPerGroup <= 1) {
+      throw new IllegalArgumentException("numDagsPerGroup has to be more than one. Got: "
+ numDagsPerGroup);
+    }
+    return DAG_GROUPID_PREFIX + SEPARATOR +
+        getApplicationId().getClusterTimestamp() + SEPARATOR +
+        tezAppIdFormat.get().format(getApplicationId().getId()) + SEPARATOR +
+        tezDagIdFormat.get().format(getId() / numDagsPerGroup);
+  }
+
   public static TezDAGID fromString(String dagId) {
     int id = 0;
     int appId = 0;

http://git-wip-us.apache.org/repos/asf/tez/blob/98e35262/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java
b/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java
index d81f56a..b4217a1 100644
--- a/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java
+++ b/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java
@@ -23,20 +23,26 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.SortedSet;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
 import org.apache.hadoop.yarn.server.timeline.NameValuePair;
 import org.apache.hadoop.yarn.server.timeline.TimelineEntityGroupPlugin;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.history.logging.EntityTypes;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
 
-public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin {
+public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin implements Configurable
{
+  private static final Logger LOG = LoggerFactory.getLogger(TimelineCachePluginImpl.class);
 
   private static Set<String> summaryEntityTypes;
   private static Set<String> knownEntityTypes;
@@ -54,11 +60,27 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin
{
         EntityTypes.TEZ_APPLICATION.name());
   }
 
+  private Configuration conf;
+
+  private Set<Integer> allNumGroupsPerDag;
+
   // Empty public constructor
   public TimelineCachePluginImpl() {
+    setConf(new TezConfiguration());
+  }
+
+  private Set<TimelineEntityGroupId> createTimelineEntityGroupIds(TezDAGID dagId) {
+    ApplicationId appId = dagId.getApplicationId();
+    HashSet<TimelineEntityGroupId> groupIds = Sets.newHashSet(
+        TimelineEntityGroupId.newInstance(appId, appId.toString()),
+        TimelineEntityGroupId.newInstance(appId, dagId.toString()));
+    for (int numGroupsPerDag : allNumGroupsPerDag) {
+      groupIds.add(TimelineEntityGroupId.newInstance(appId, dagId.getGroupId(numGroupsPerDag)));
+    }
+    return groupIds;
   }
 
-  private TimelineEntityGroupId convertToTimelineEntityGroupId(String entityType, String
entityId) {
+  private Set<TimelineEntityGroupId> convertToTimelineEntityGroupIds(String entityType,
String entityId) {
     if (entityType == null || entityType.isEmpty()
         || entityId == null || entityId.isEmpty()) {
       return null;
@@ -66,27 +88,23 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin
{
     if (entityType.equals(EntityTypes.TEZ_DAG_ID.name())) {
       TezDAGID dagId = TezDAGID.fromString(entityId);
       if (dagId != null) {
-        return TimelineEntityGroupId.newInstance(dagId.getApplicationId(), dagId.toString());
+        return createTimelineEntityGroupIds(dagId);
       }
     } else if (entityType.equals(EntityTypes.TEZ_VERTEX_ID.name())) {
       TezVertexID vertexID = TezVertexID.fromString(entityId);
       if (vertexID != null) {
-        return TimelineEntityGroupId.newInstance(vertexID.getDAGId().getApplicationId(),
-            vertexID.getDAGId().toString());
+        return createTimelineEntityGroupIds(vertexID.getDAGId());
       }
 
     } else if (entityType.equals(EntityTypes.TEZ_TASK_ID.name())) {
       TezTaskID taskID = TezTaskID.fromString(entityId);
       if (taskID != null) {
-        return TimelineEntityGroupId.newInstance(taskID.getVertexID().getDAGId().getApplicationId(),
-            taskID.getVertexID().getDAGId().toString());
+        return createTimelineEntityGroupIds(taskID.getVertexID().getDAGId());
       }
     } else if (entityType.equals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name())) {
       TezTaskAttemptID taskAttemptID = TezTaskAttemptID.fromString(entityId);
       if (taskAttemptID != null) {
-        return TimelineEntityGroupId.newInstance(
-            taskAttemptID.getTaskID().getVertexID().getDAGId().getApplicationId(),
-            taskAttemptID.getTaskID().getVertexID().getDAGId().toString());
+        return createTimelineEntityGroupIds(taskAttemptID.getTaskID().getVertexID().getDAGId());
       }
     } else if (entityType.equals(EntityTypes.TEZ_CONTAINER_ID.name())) {
       String cId = entityId;
@@ -95,9 +113,9 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin
{
       }
       ContainerId containerId = ContainerId.fromString(cId);
       if (containerId != null) {
-        return TimelineEntityGroupId.newInstance(
+        return Sets.newHashSet(TimelineEntityGroupId.newInstance(
             containerId.getApplicationAttemptId().getApplicationId(),
-            containerId.getApplicationAttemptId().getApplicationId().toString());
+            containerId.getApplicationAttemptId().getApplicationId().toString()));
       }
     }
     return null;
@@ -113,15 +131,7 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin
{
         || summaryEntityTypes.contains(entityType)) {
       return null;
     }
-    TimelineEntityGroupId groupId = convertToTimelineEntityGroupId(primaryFilter.getName(),
-        primaryFilter.getValue().toString());
-    if (groupId != null) {
-      TimelineEntityGroupId appGroupId =
-          TimelineEntityGroupId.newInstance(groupId.getApplicationId(),
-              groupId.getApplicationId().toString());
-      return Sets.newHashSet(groupId, appGroupId);
-    }
-    return null;
+    return convertToTimelineEntityGroupIds(primaryFilter.getName(), primaryFilter.getValue().toString());
   }
 
   @Override
@@ -129,14 +139,7 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin
{
     if (!knownEntityTypes.contains(entityType) || summaryEntityTypes.contains(entityType))
{
       return null;
     }
-    TimelineEntityGroupId groupId = convertToTimelineEntityGroupId(entityType, entityId);
-    if (groupId != null) {
-      TimelineEntityGroupId appGroupId =
-          TimelineEntityGroupId.newInstance(groupId.getApplicationId(),
-              groupId.getApplicationId().toString());
-      return Sets.newHashSet(groupId, appGroupId);
-    }
-    return null;
+    return convertToTimelineEntityGroupIds(entityType, entityId);
   }
 
   @Override
@@ -147,20 +150,53 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin
{
         || entityIds == null || entityIds.isEmpty()) {
       return null;
     }
-    Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
-    Set<ApplicationId> appIdSet = new HashSet<ApplicationId>();
 
+    Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
     for (String entityId : entityIds) {
-      TimelineEntityGroupId groupId = convertToTimelineEntityGroupId(entityType, entityId);
+      Set<TimelineEntityGroupId> groupId = convertToTimelineEntityGroupIds(entityType,
entityId);
       if (groupId != null) {
-        groupIds.add(groupId);
-        appIdSet.add(groupId.getApplicationId());
+        groupIds.addAll(groupId);
       }
     }
-    for (ApplicationId appId : appIdSet) {
-      groupIds.add(TimelineEntityGroupId.newInstance(appId, appId.toString()));
-    }
     return groupIds;
   }
 
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf instanceof TezConfiguration ? conf : new TezConfiguration(conf);
+
+    this.allNumGroupsPerDag = loadAllNumDagsPerGroup();
+  }
+
+  private Set<Integer> loadAllNumDagsPerGroup() {
+    Set<Integer> allNumDagsPerGroup = new HashSet<Integer>();
+
+    int numDagsPerGroup = conf.getInt(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP,
+        TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP_DEFAULT);
+    if (numDagsPerGroup > 1) {
+      // Add current numDagsPerGroup from config.
+      allNumDagsPerGroup.add(numDagsPerGroup);
+    }
+
+    // Add the older values from config.
+    int [] usedNumGroups = conf.getInts(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_CACHE_PLUGIN_OLD_NUM_DAGS_PER_GROUP);
+    if (usedNumGroups != null) {
+      for (int i = 0; i < usedNumGroups.length; ++i) {
+        allNumDagsPerGroup.add(usedNumGroups[i]);
+      }
+    }
+
+    // Warn for performance impact
+    if (allNumDagsPerGroup.size() > 3) {
+      LOG.warn("Too many entries in " + TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_CACHE_PLUGIN_OLD_NUM_DAGS_PER_GROUP
+
+          ", this can result in slower lookup from Yarn Timeline server or slower load times
in TezUI.");
+    }
+    return allNumDagsPerGroup;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/98e35262/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java
b/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java
index 562a66e..6f819ba 100644
--- a/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java
+++ b/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java
@@ -27,11 +27,14 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
 import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.history.logging.EntityTypes;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -43,6 +46,7 @@ import org.junit.Test;
 
 import com.google.common.collect.Sets;
 
+
 public class TestTimelineCachePluginImpl {
 
   static ApplicationId appId1;
@@ -61,8 +65,20 @@ public class TestTimelineCachePluginImpl {
   static Map<String, String> typeIdMap1;
   static Map<String, String> typeIdMap2;
 
-  TimelineCachePluginImpl plugin =
-      new TimelineCachePluginImpl();
+  private static TimelineCachePluginImpl createPlugin(int numDagsPerGroup, String usedNumDagsPerGroup)
{
+    Configuration conf = new Configuration(false);
+    if (numDagsPerGroup > 0) {
+      conf.setInt(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP, numDagsPerGroup);
+    }
+    if (usedNumDagsPerGroup != null) {
+      conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_CACHE_PLUGIN_OLD_NUM_DAGS_PER_GROUP,
usedNumDagsPerGroup);
+    }
+    if (numDagsPerGroup > 0 || usedNumDagsPerGroup != null) {
+      return ReflectionUtils.newInstance(TimelineCachePluginImpl.class, conf);
+    } else {
+      return new TimelineCachePluginImpl();
+    }
+  }
 
   @BeforeClass
   public static void beforeClass() {
@@ -94,11 +110,11 @@ public class TestTimelineCachePluginImpl {
     typeIdMap2.put(EntityTypes.TEZ_VERTEX_ID.name(), vertexID2.toString());
     typeIdMap2.put(EntityTypes.TEZ_TASK_ID.name(), taskID2.toString());
     typeIdMap2.put(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), attemptID2.toString());
-
   }
 
   @Test
   public void testGetTimelineEntityGroupIdByPrimaryFilter() {
+    TimelineCachePluginImpl plugin = createPlugin(100, null);
     for (Entry<String, String> entry : typeIdMap1.entrySet()) {
       NameValuePair primaryFilter = new NameValuePair(entry.getKey(), entry.getValue());
       Assert.assertNull(plugin.getTimelineEntityGroupId(EntityTypes.TEZ_APPLICATION.name(),
@@ -108,19 +124,38 @@ public class TestTimelineCachePluginImpl {
         Assert.assertNull(groupIds);
         continue;
       }
+      Assert.assertEquals(3, groupIds.size());
+      Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
+      while (iter.hasNext()) {
+        TimelineEntityGroupId groupId = iter.next();
+        Assert.assertEquals(appId1, groupId.getApplicationId());
+        Assert.assertTrue(getGroupIds(dagID1, appId1, 100).contains(groupId.getTimelineEntityGroupId()));
+      }
+    }
+  }
+
+  @Test
+  public void testGetTimelineEntityGroupIdByIdDefaultConfig() {
+    TimelineCachePluginImpl plugin = createPlugin(-1, null);
+    for (Entry<String, String> entry : typeIdMap1.entrySet()) {
+      Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(),
entry.getKey());
+      if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) {
+        Assert.assertNull(groupIds);
+        continue;
+      }
       Assert.assertEquals(2, groupIds.size());
       Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
       while (iter.hasNext()) {
         TimelineEntityGroupId groupId = iter.next();
         Assert.assertEquals(appId1, groupId.getApplicationId());
-        Assert.assertTrue((dagID1.toString().equals(groupId.getTimelineEntityGroupId()))
-            || (appId1.toString().equals(groupId.getTimelineEntityGroupId())));
+        Assert.assertTrue(getGroupIds(dagID1, appId1).contains(groupId.getTimelineEntityGroupId()));
       }
     }
   }
 
   @Test
-  public void testGetTimelineEntityGroupIdById() {
+  public void testGetTimelineEntityGroupIdByIdNoGroupingConf() {
+    TimelineCachePluginImpl plugin = createPlugin(1, null);
     for (Entry<String, String> entry : typeIdMap1.entrySet()) {
       Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(),
entry.getKey());
       if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) {
@@ -132,14 +167,90 @@ public class TestTimelineCachePluginImpl {
       while (iter.hasNext()) {
         TimelineEntityGroupId groupId = iter.next();
         Assert.assertEquals(appId1, groupId.getApplicationId());
-        Assert.assertTrue((dagID1.toString().equals(groupId.getTimelineEntityGroupId()))
-            || (appId1.toString().equals(groupId.getTimelineEntityGroupId())));
+        Assert.assertTrue(getGroupIds(dagID1, appId1).contains(groupId.getTimelineEntityGroupId()));
+      }
+    }
+  }
+
+  @Test
+  public void testGetTimelineEntityGroupIdById() {
+    TimelineCachePluginImpl plugin = createPlugin(100, null);
+    for (Entry<String, String> entry : typeIdMap1.entrySet()) {
+      Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(),
entry.getKey());
+      if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) {
+        Assert.assertNull(groupIds);
+        continue;
+      }
+      Assert.assertEquals(3, groupIds.size());
+      Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
+      while (iter.hasNext()) {
+        TimelineEntityGroupId groupId = iter.next();
+        Assert.assertEquals(appId1, groupId.getApplicationId());
+        Assert.assertTrue(getGroupIds(dagID1, appId1, 100).contains(groupId.getTimelineEntityGroupId()));
+      }
+    }
+  }
+
+  @Test
+  public void testGetTimelineEntityGroupIdByIdWithOldGroupIdsSingle() {
+    TimelineCachePluginImpl plugin = createPlugin(100, "50");
+    for (Entry<String, String> entry : typeIdMap2.entrySet()) {
+      Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(),
entry.getKey());
+      if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) {
+        Assert.assertNull(groupIds);
+        continue;
+      }
+      Assert.assertEquals(4, groupIds.size());
+      Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
+      while (iter.hasNext()) {
+        TimelineEntityGroupId groupId = iter.next();
+        Assert.assertEquals(appId2, groupId.getApplicationId());
+        Assert.assertTrue(getGroupIds(dagID2, appId2, 100, 50).contains(groupId.getTimelineEntityGroupId()));
+      }
+    }
+  }
+
+  @Test
+  public void testGetTimelineEntityGroupIdByIdWithOldGroupIdsMultiple() {
+    TimelineCachePluginImpl plugin = createPlugin(100, "25, 50");
+    for (Entry<String, String> entry : typeIdMap2.entrySet()) {
+      Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(),
entry.getKey());
+      if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) {
+        Assert.assertNull(groupIds);
+        continue;
+      }
+      Assert.assertEquals(5, groupIds.size());
+      Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
+      while (iter.hasNext()) {
+        TimelineEntityGroupId groupId = iter.next();
+        Assert.assertEquals(appId2, groupId.getApplicationId());
+        Assert.assertTrue(getGroupIds(dagID2, appId2, 100, 25, 50).contains(groupId.getTimelineEntityGroupId()));
+      }
+    }
+  }
+
+  @Test
+  public void testGetTimelineEntityGroupIdByIdWithOldGroupIdsEmpty() {
+    TimelineCachePluginImpl plugin = createPlugin(100, "");
+    for (Entry<String, String> entry : typeIdMap2.entrySet()) {
+      Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(),
entry.getKey());
+      if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) {
+        Assert.assertNull(groupIds);
+        continue;
+      }
+      Assert.assertEquals(3, groupIds.size());
+      Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
+      while (iter.hasNext()) {
+        TimelineEntityGroupId groupId = iter.next();
+        Assert.assertEquals(appId2, groupId.getApplicationId());
+        Assert.assertTrue(getGroupIds(dagID2, appId2, 100).contains(groupId.getTimelineEntityGroupId()));
       }
     }
   }
 
   @Test
   public void testGetTimelineEntityGroupIdByIds() {
+    TimelineCachePluginImpl plugin = createPlugin(100, null);
     for (Entry<String, String> entry : typeIdMap1.entrySet()) {
       SortedSet<String> entityIds = new TreeSet<String>();
       entityIds.add(entry.getValue());
@@ -150,31 +261,36 @@ public class TestTimelineCachePluginImpl {
         Assert.assertNull(groupIds);
         continue;
       }
-      Assert.assertEquals(4, groupIds.size());
+      Assert.assertEquals(6, groupIds.size());
       int found = 0;
       Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
       while (iter.hasNext()) {
         TimelineEntityGroupId groupId = iter.next();
-        if (groupId.getApplicationId().equals(appId1)
-            && groupId.getTimelineEntityGroupId().equals(dagID1.toString())) {
-          ++found;
-        } else if (groupId.getApplicationId().equals(appId2)
-            && groupId.getTimelineEntityGroupId().equals(dagID2.toString())) {
-          ++found;
-        } else if (groupId.getApplicationId().equals(appId1)
-            && groupId.getTimelineEntityGroupId().equals(appId1.toString())) {
-          ++found;
-        } else if (groupId.getApplicationId().equals(appId2)
-            && groupId.getTimelineEntityGroupId().equals(appId2.toString())) {
-          ++found;
+        if (groupId.getApplicationId().equals(appId1)) {
+          String entityGroupId = groupId.getTimelineEntityGroupId();
+          if (getGroupIds(dagID1, appId1, 100).contains(entityGroupId)) {
+            ++found;
+          } else {
+            Assert.fail("Unexpected group id: " + entityGroupId);
+          }
+        } else if (groupId.getApplicationId().equals(appId2)) {
+          String entityGroupId = groupId.getTimelineEntityGroupId();
+          if (getGroupIds(dagID2, appId2, 100).contains(entityGroupId)) {
+            ++found;
+          } else {
+            Assert.fail("Unexpected group id: " + entityGroupId);
+          }
+        } else {
+          Assert.fail("Unexpected appId: " + groupId.getApplicationId());
         }
       }
-      Assert.assertEquals("All groupIds not returned", 4, found);
+      Assert.assertEquals("All groupIds not returned", 6, found);
     }
   }
 
   @Test
   public void testInvalidIds() {
+    TimelineCachePluginImpl plugin = createPlugin(-1, null);
     Assert.assertNull(plugin.getTimelineEntityGroupId(EntityTypes.TEZ_DAG_ID.name(),
         vertexID1.toString()));
     Assert.assertNull(plugin.getTimelineEntityGroupId(EntityTypes.TEZ_VERTEX_ID.name(),
@@ -190,6 +306,7 @@ public class TestTimelineCachePluginImpl {
 
   @Test
   public void testInvalidTypeRequests() {
+    TimelineCachePluginImpl plugin = createPlugin(-1, null);
     Assert.assertNull(plugin.getTimelineEntityGroupId(EntityTypes.TEZ_APPLICATION.name(),
         appId1.toString()));
     Assert.assertNull(plugin.getTimelineEntityGroupId(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
@@ -206,7 +323,7 @@ public class TestTimelineCachePluginImpl {
 
   @Test
   public void testContainerIdConversion() {
-
+    TimelineCachePluginImpl plugin = createPlugin(-1, null);
     String entityType = EntityTypes.TEZ_CONTAINER_ID.name();
     SortedSet<String> entityIds = new TreeSet<String>();
     entityIds.add("tez_" + cId1.toString());
@@ -255,6 +372,13 @@ public class TestTimelineCachePluginImpl {
       }
     }
     Assert.assertEquals("All groupIds not returned", 1, found);
+  }
 
+  private Set<String> getGroupIds(TezDAGID dagId, ApplicationId appId, int ... allNumDagsPerGroup)
{
+    HashSet<String> groupIds = Sets.newHashSet(dagId.toString(), appId.toString());
+    for (int numDagsPerGroup : allNumDagsPerGroup) {
+      groupIds.add(dagId.getGroupId(numDagsPerGroup));
+    }
+    return groupIds;
   }
 }


Mime
View raw message