tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject [1/2] git commit: TEZ-1658. Additional data generation to Timeline for UI. (hitesh)
Date Thu, 16 Oct 2014 22:57:03 GMT
Repository: tez
Updated Branches:
  refs/heads/master 4186c6dee -> 190a74fdc


TEZ-1658. Additional data generation to Timeline for UI. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/190a74fd
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/190a74fd
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/190a74fd

Branch: refs/heads/master
Commit: 190a74fdc28655f8ca1a3946687aad40c8254ad0
Parents: 2600c53
Author: Hitesh Shah <hitesh@apache.org>
Authored: Thu Oct 16 15:53:21 2014 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Thu Oct 16 15:56:28 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/common/ATSConstants.java     |  1 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  5 ++--
 .../java/org/apache/tez/dag/app/dag/DAG.java    |  2 ++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 14 ++++++++++++
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  2 +-
 .../dag/history/events/DAGSubmittedEvent.java   |  9 +++++++-
 .../history/events/TaskAttemptStartedEvent.java |  9 +++++++-
 .../impl/HistoryEventJsonConversion.java        |  5 +++-
 .../apache/tez/dag/history/utils/DAGUtils.java  | 18 +++++++++++----
 .../app/dag/impl/TestTaskAttemptRecovery.java   |  2 +-
 .../tez/dag/app/dag/impl/TestTaskRecovery.java  | 12 +++++-----
 .../TestHistoryEventsProtoConversion.java       |  4 ++--
 .../impl/TestHistoryEventJsonConversion.java    |  4 ++--
 .../tez/dag/history/utils/TestDAGUtils.java     | 24 ++++++++++++++++----
 .../ats/HistoryEventTimelineConversion.java     | 14 +++++++++++-
 .../ats/TestHistoryEventTimelineConversion.java |  4 ++--
 17 files changed, 102 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 25da87f..fd262ca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -33,6 +33,7 @@ ALL CHANGES:
   TEZ-1632. NPE at TestPreemption.testPreemptionWithoutSession
   TEZ-1674. Rename configuration parameters related to counters / memory scaling.
   TEZ-1176. Set parallelism should end up sending an update to ATS if numTasks are updated
at run-time.
+  TEZ-1658. Additional data generation to Timeline for UI.
 
 Release 0.5.1: 2014-10-02
 

http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index 3859373..ab81683 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -43,6 +43,7 @@ public class ATSConstants {
   public static final String APPLICATION_ATTEMPT_ID = "applicationAttemptId";
   public static final String CONTAINER_ID = "containerId";
   public static final String NODE_ID = "nodeId";
+  public static final String NODE_HTTP_ADDRESS = "nodeHttpAddress";
   public static final String USER = "user";
 
   /* Keys used in other info */

http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index e6a1d9c..1bdec84 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -689,7 +689,8 @@ public class DAGAppMaster extends AbstractService {
     try {
       if (LOG.isDebugEnabled()) {
         LOG.info("JSON dump for submitted DAG, dagId=" + dagId.toString()
-            + ", json=" + DAGUtils.generateSimpleJSONPlan(dagPB).toString());
+            + ", json="
+            + DAGUtils.generateSimpleJSONPlan(dagPB, newDag.getVertexNameIDMapping()).toString());
       }
     } catch (JSONException e) {
       LOG.warn("Failed to generate json for DAG", e);
@@ -1837,7 +1838,7 @@ public class DAGAppMaster extends AbstractService {
     // for an app later
     DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(),
         submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources,
-        newDAG.getUserName());
+        newDAG.getUserName(), newDAG.getVertexNameIDMapping());
     try {
       historyEventHandler.handleCriticalEvent(
           new DAGHistoryEvent(newDAG.getID(), submittedEvent));

http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index a2f04ab..8677015 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -85,4 +85,6 @@ public interface DAG {
 
   ACLManager getACLManager();
 
+  Map<String, TezVertexID> getVertexNameIDMapping();
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index c4e16e2..823626d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -553,6 +553,20 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
 
   @Override
+  public Map<String, TezVertexID> getVertexNameIDMapping() {
+    this.readLock.lock();
+    try {
+      Map<String, TezVertexID> idNameMap = new HashMap<String, TezVertexID>();
+      for (Vertex v : getVertices().values()) {
+        idNameMap.put(v.getName(), v.getVertexId());
+      }
+      return idNameMap;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
   public TezCounters getAllCounters() {
 
     readLock.lock();

http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 56dd303..eab07a5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1003,7 +1003,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent(
         attemptId, getTask().getVertex().getName(),
         launchTime, containerId, containerNodeId,
-        inProgressLogsUrl, completedLogsUrl);
+        inProgressLogsUrl, completedLogsUrl, nodeHttpAddress);
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), startEvt));
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 5911ff3..0074a4e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -35,6 +35,7 @@ import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGSubmittedProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.dag.utils.ProtoUtils;
@@ -53,6 +54,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
   private ApplicationAttemptId applicationAttemptId;
   private String user;
   private Map<String, LocalResource> cumulativeAdditionalLocalResources;
+  private Map<String, TezVertexID> vertexNameIDMap;
 
   public DAGSubmittedEvent() {
   }
@@ -60,7 +62,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
   public DAGSubmittedEvent(TezDAGID dagID, long submitTime,
       DAGProtos.DAGPlan dagPlan, ApplicationAttemptId applicationAttemptId,
       Map<String, LocalResource> cumulativeAdditionalLocalResources,
-      String user) {
+      String user, Map<String, TezVertexID> vertexNameIDMap) {
     this.dagID = dagID;
     this.dagName = dagPlan.getName();
     this.submitTime = submitTime;
@@ -68,6 +70,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
     this.applicationAttemptId = applicationAttemptId;
     this.cumulativeAdditionalLocalResources = cumulativeAdditionalLocalResources;
     this.user = user;
+    this.vertexNameIDMap = vertexNameIDMap;
   }
 
   @Override
@@ -182,4 +185,8 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
     return user;
   }
 
+  public Map<String, TezVertexID> getVertexNameIDMap() {
+    return vertexNameIDMap;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index f369823..36add86 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -39,11 +39,13 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
   private long startTime;
   private ContainerId containerId;
   private NodeId nodeId;
+  private String nodeHttpAddress;
 
   public TaskAttemptStartedEvent(TezTaskAttemptID taId,
       String vertexName, long startTime,
       ContainerId containerId, NodeId nodeId,
-      String inProgressLogsUrl, String completedLogsUrl) {
+      String inProgressLogsUrl, String completedLogsUrl,
+      String nodeHttpAddress) {
     this.taskAttemptId = taId;
     this.vertexName = vertexName;
     this.startTime = startTime;
@@ -51,6 +53,7 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
     this.nodeId = nodeId;
     this.inProgressLogsUrl = inProgressLogsUrl;
     this.completedLogsUrl = completedLogsUrl;
+    this.nodeHttpAddress = nodeHttpAddress;
   }
 
   public TaskAttemptStartedEvent() {
@@ -136,4 +139,8 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
     return completedLogsUrl;
   }
 
+  public String getNodeHttpAddress() {
+    return nodeHttpAddress;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 37292ff..cec0a37 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -22,6 +22,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import java.util.Map.Entry;
+
 import org.apache.tez.common.ATSConstants;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.history.HistoryEvent;
@@ -44,6 +46,7 @@ import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.history.logging.EntityTypes;
 import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.records.TezVertexID;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -408,7 +411,7 @@ public class HistoryEventJsonConversion {
     // Other info such as dag plan
     JSONObject otherInfo = new JSONObject();
     otherInfo.put(ATSConstants.DAG_PLAN,
-        DAGUtils.generateSimpleJSONPlan(event.getDAGPlan()));
+        DAGUtils.generateSimpleJSONPlan(event.getDAGPlan(), event.getVertexNameIDMap()));
     jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
 
     return jsonObject;

http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 5d364fd..309d6d2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -33,9 +33,12 @@ import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
+import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.impl.VertexStats;
 import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
@@ -47,6 +50,7 @@ public class DAGUtils {
   public static final String VERTEX_GROUPS_KEY = "vertexGroups";
 
   public static final String VERTEX_NAME_KEY = "vertexName";
+  public static final String VERTEX_ID_KEY = "vertexId";
   public static final String PROCESSOR_CLASS_KEY = "processorClass";
   public static final String IN_EDGE_IDS_KEY = "inEdgeIds";
   public static final String OUT_EDGE_IDS_KEY = "outEdgeIds";
@@ -81,10 +85,11 @@ public class DAGUtils {
 
 
 
-  public static JSONObject generateSimpleJSONPlan(DAGProtos.DAGPlan dagPlan) throws JSONException
{
+  public static JSONObject generateSimpleJSONPlan(DAGPlan dagPlan,
+      Map<String, TezVertexID> vertexNameIDMap) throws JSONException {
     JSONObject dagJson;
     try {
-      dagJson = new JSONObject(convertDAGPlanToATSMap(dagPlan));
+      dagJson = new JSONObject(convertDAGPlanToATSMap(dagPlan, vertexNameIDMap));
     } catch (IOException e) {
       throw new TezUncheckedException(e);
     }
@@ -125,7 +130,7 @@ public class DAGUtils {
   }
 
   public static Map<String,Object> convertDAGPlanToATSMap(
-      DAGProtos.DAGPlan dagPlan) throws IOException {
+      DAGPlan dagPlan, Map<String, TezVertexID> vertexNameIDMap) throws IOException
{
 
     final String VERSION_KEY = "version";
     final int version = 1;
@@ -136,7 +141,12 @@ public class DAGUtils {
     for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) {
       Map<String,Object> vertexMap = new LinkedHashMap<String, Object>();
       vertexMap.put(VERTEX_NAME_KEY, vertexPlan.getName());
-
+      if (vertexNameIDMap != null && !vertexNameIDMap.isEmpty()) {
+        TezVertexID vertexID = vertexNameIDMap.get(vertexPlan.getName());
+        if (vertexID != null) {
+          vertexMap.put(VERTEX_ID_KEY, vertexID.toString());
+        }
+      }
       if (vertexPlan.hasProcessorDescriptor()) {
         vertexMap.put(PROCESSOR_CLASS_KEY,
             vertexPlan.getProcessorDescriptor().getClassName());

http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index a443a35..143268b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -79,7 +79,7 @@ public class TestTaskAttemptRecovery {
   private void restoreFromTAStartEvent() {
     TaskAttemptState recoveredState =
         ta.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
-            startTime, mock(ContainerId.class), mock(NodeId.class), "", ""));
+            startTime, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
     assertEquals(startTime, ta.getLaunchTime());
     assertEquals(TaskAttemptState.RUNNING, recoveredState);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index 6c0d55e..fe7ecff 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -210,7 +210,7 @@ public class TestTaskRecovery {
     long taStartTime = taskStartTime + 100L;
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
-            taStartTime, mock(ContainerId.class), mock(NodeId.class), "", ""));
+            taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(0, task.getFinishedAttemptsCount());
     assertEquals(taskScheduledTime, task.scheduledTime);
@@ -642,7 +642,7 @@ public class TestTaskRecovery {
     TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
-            taStartTime, mock(ContainerId.class), mock(NodeId.class), "", ""));
+            taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(TaskAttemptStateInternal.NEW,
         ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
@@ -695,7 +695,7 @@ public class TestTaskRecovery {
     for (int i = 0; i < maxFailedAttempts; ++i) {
       TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
-          mock(ContainerId.class), mock(NodeId.class), "", ""));
+          mock(ContainerId.class), mock(NodeId.class), "", "", ""));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
           0, TaskAttemptState.KILLED, "", null));
     }
@@ -725,7 +725,7 @@ public class TestTaskRecovery {
     for (int i = 0; i < maxFailedAttempts; ++i) {
       TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
-          mock(ContainerId.class), mock(NodeId.class), "", ""));
+          mock(ContainerId.class), mock(NodeId.class), "", "", ""));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
           0, TaskAttemptState.FAILED, "", null));
     }
@@ -755,7 +755,7 @@ public class TestTaskRecovery {
     for (int i = 0; i < maxFailedAttempts - 1; ++i) {
       TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
-          mock(ContainerId.class), mock(NodeId.class), "", ""));
+          mock(ContainerId.class), mock(NodeId.class), "", "", ""));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
           0, TaskAttemptState.FAILED, "", null));
     }
@@ -765,7 +765,7 @@ public class TestTaskRecovery {
     TezTaskAttemptID newTaskAttemptId = getNewTaskAttemptID(task.getTaskId());
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptStartedEvent(newTaskAttemptId,
-            vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", ""));
+            vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
 
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(TaskAttemptStateInternal.NEW,

http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index f030db7..903b4fe 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -148,7 +148,7 @@ public class TestHistoryEventsProtoConversion {
         ApplicationId.newInstance(0, 1), 1), 1001l,
         DAGPlan.newBuilder().setName("foo").build(),
         ApplicationAttemptId.newInstance(
-            ApplicationId.newInstance(0, 1), 1), null, "");
+            ApplicationId.newInstance(0, 1), 1), null, "", null);
     DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getApplicationAttemptId(),
@@ -435,7 +435,7 @@ public class TestHistoryEventsProtoConversion {
         "vertex1", 10009l, ContainerId.newInstance(
         ApplicationAttemptId.newInstance(
             ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance(
-        "host1", 19999), "inProgress", "Completed");
+        "host1", 19999), "inProgress", "Completed", "nodeHttpAddress");
     TaskAttemptStartedEvent deserializedEvent = (TaskAttemptStartedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getTaskAttemptID(),

http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index d7aca55..c9384e1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -110,7 +110,7 @@ public class TestHistoryEventJsonConversion {
           break;
         case DAG_SUBMITTED:
           event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
-              null, user);
+              null, user, null);
           break;
         case DAG_INITIALIZED:
           event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
@@ -146,7 +146,7 @@ public class TestHistoryEventJsonConversion {
           break;
         case TASK_ATTEMPT_STARTED:
           event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
-              nodeId, null, null);
+              nodeId, null, null, "nodeHttpAddress");
           break;
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),

http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
index 0be67ad..e50c67c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
@@ -20,11 +20,13 @@ package org.apache.tez.dag.history.utils;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -40,6 +42,8 @@ import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.codehaus.jettison.json.JSONException;
 import org.junit.Assert;
@@ -94,7 +98,17 @@ public class TestDAGUtils {
   @SuppressWarnings("unchecked")
   public void testConvertDAGPlanToATSMap() throws IOException, JSONException {
     DAGPlan dagPlan = createDAG();
-    Map<String, Object> atsMap = DAGUtils.convertDAGPlanToATSMap(dagPlan);
+    Map<String,TezVertexID> idNameMap = new HashMap<String, TezVertexID>();
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    TezVertexID vId1 = TezVertexID.getInstance(dagId, 1);
+    TezVertexID vId2 = TezVertexID.getInstance(dagId, 2);
+    TezVertexID vId3 = TezVertexID.getInstance(dagId, 3);
+    idNameMap.put("vertex1", vId1);
+    idNameMap.put("vertex2", vId2);
+    idNameMap.put("vertex3", vId3);
+
+    Map<String, Object> atsMap = DAGUtils.convertDAGPlanToATSMap(dagPlan, idNameMap);
     Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_NAME_KEY));
     Assert.assertEquals(dagPlan.getName(), atsMap.get(DAGUtils.DAG_NAME_KEY));
     Assert.assertTrue(atsMap.containsKey("version"));
@@ -104,7 +118,6 @@ public class TestDAGUtils {
     Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTEX_GROUPS_KEY));
 
     Assert.assertEquals(3, ((Collection<?>) atsMap.get(DAGUtils.VERTICES_KEY)).size());
-    Set<String> vNames = Sets.newHashSet("vertex1", "vertex2", "vertex3");
 
     Set<String> inEdgeIds = new HashSet<String>();
     Set<String> outEdgeIds = new HashSet<String>();
@@ -115,6 +128,10 @@ public class TestDAGUtils {
     for (Object o : ((Collection<?>) atsMap.get(DAGUtils.VERTICES_KEY))) {
       Map<String, Object> v = (Map<String, Object>) o;
       Assert.assertTrue(v.containsKey(DAGUtils.VERTEX_NAME_KEY));
+      Assert.assertTrue(v.containsKey(DAGUtils.VERTEX_ID_KEY));
+      String vId = (String)v.get(DAGUtils.VERTEX_ID_KEY);
+      String vName = (String)v.get(DAGUtils.VERTEX_NAME_KEY);
+      Assert.assertEquals(idNameMap.get(vName).toString(), vId);
       Assert.assertTrue(v.containsKey(DAGUtils.PROCESSOR_CLASS_KEY));
       Assert.assertTrue(v.containsKey(DAGUtils.USER_PAYLOAD_AS_TEXT));
 
@@ -125,8 +142,7 @@ public class TestDAGUtils {
         outEdgeIds.addAll(((Collection<String>) v.get(DAGUtils.OUT_EDGE_IDS_KEY)));
       }
 
-      String vName = (String) v.get(DAGUtils.VERTEX_NAME_KEY);
-      Assert.assertTrue(vNames.contains(vName));
+      Assert.assertTrue(idNameMap.containsKey(vName));
       String procPayload = vName + " Processor HistoryText";
       Assert.assertEquals(procPayload, v.get(DAGUtils.USER_PAYLOAD_AS_TEXT));
 

http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 3ed3077..97cc3f7 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -199,6 +199,8 @@ public class HistoryEventTimelineConversion {
     stoppedEvt.setTimestamp(event.getStoppedTime());
     atsEntity.addEvent(stoppedEvt);
 
+    atsEntity.addPrimaryFilter(ATSConstants.EXIT_STATUS, event.getExitStatus());
+
     atsEntity.addOtherInfo(ATSConstants.EXIT_STATUS, event.getExitStatus());
     atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getStoppedTime());
 
@@ -217,6 +219,7 @@ public class HistoryEventTimelineConversion {
 
     atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
     atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
+    atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name());
 
     atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
     atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
@@ -291,7 +294,7 @@ public class HistoryEventTimelineConversion {
 
     try {
       atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
-          DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
+          DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan(), event.getVertexNameIDMap()));
     } catch (IOException e) {
       throw new TezUncheckedException(e);
     }
@@ -318,6 +321,8 @@ public class HistoryEventTimelineConversion {
     finishEvt.setTimestamp(event.getFinishTime());
     atsEntity.addEvent(finishEvt);
 
+    atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name());
+
     atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
     atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
     atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
@@ -355,6 +360,9 @@ public class HistoryEventTimelineConversion {
     atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
     atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
     atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+    atsEntity.addOtherInfo(ATSConstants.NODE_ID, event.getNodeId().toString());
+    atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress());
+    atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString());
 
     return atsEntity;
   }
@@ -374,6 +382,8 @@ public class HistoryEventTimelineConversion {
     finishEvt.setTimestamp(event.getFinishTime());
     atsEntity.addEvent(finishEvt);
 
+    atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name());
+
     atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
     atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
     atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
@@ -421,6 +431,8 @@ public class HistoryEventTimelineConversion {
     finishEvt.setTimestamp(event.getFinishTime());
     atsEntity.addEvent(finishEvt);
 
+    atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name());
+
     atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
     atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
     atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());

http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index f275921..d36172f 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -109,7 +109,7 @@ public class TestHistoryEventTimelineConversion {
           break;
         case DAG_SUBMITTED:
           event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
-              null, user);
+              null, user, null);
           break;
         case DAG_INITIALIZED:
           event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
@@ -145,7 +145,7 @@ public class TestHistoryEventTimelineConversion {
           break;
         case TASK_ATTEMPT_STARTED:
           event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
-              nodeId, null, null);
+              nodeId, null, null, "nodeHttpAddress");
           break;
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),


Mime
View raw message