Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9698A17EFA for ; Thu, 16 Oct 2014 22:57:03 +0000 (UTC) Received: (qmail 26242 invoked by uid 500); 16 Oct 2014 22:57:03 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 26200 invoked by uid 500); 16 Oct 2014 22:57:03 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 26187 invoked by uid 99); 16 Oct 2014 22:57:03 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Oct 2014 22:57:03 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 36EFF9CBF9B; Thu, 16 Oct 2014 22:57:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hitesh@apache.org To: commits@tez.apache.org Date: Thu, 16 Oct 2014 22:57:03 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] git commit: TEZ-1658. Additional data generation to Timeline for UI. (hitesh) Repository: tez Updated Branches: refs/heads/master 4186c6dee -> 190a74fdc TEZ-1658. Additional data generation to Timeline for UI. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/190a74fd Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/190a74fd Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/190a74fd Branch: refs/heads/master Commit: 190a74fdc28655f8ca1a3946687aad40c8254ad0 Parents: 2600c53 Author: Hitesh Shah Authored: Thu Oct 16 15:53:21 2014 -0700 Committer: Hitesh Shah Committed: Thu Oct 16 15:56:28 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/common/ATSConstants.java | 1 + .../org/apache/tez/dag/app/DAGAppMaster.java | 5 ++-- .../java/org/apache/tez/dag/app/dag/DAG.java | 2 ++ .../apache/tez/dag/app/dag/impl/DAGImpl.java | 14 ++++++++++++ .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 2 +- .../dag/history/events/DAGSubmittedEvent.java | 9 +++++++- .../history/events/TaskAttemptStartedEvent.java | 9 +++++++- .../impl/HistoryEventJsonConversion.java | 5 +++- .../apache/tez/dag/history/utils/DAGUtils.java | 18 +++++++++++---- .../app/dag/impl/TestTaskAttemptRecovery.java | 2 +- .../tez/dag/app/dag/impl/TestTaskRecovery.java | 12 +++++----- .../TestHistoryEventsProtoConversion.java | 4 ++-- .../impl/TestHistoryEventJsonConversion.java | 4 ++-- .../tez/dag/history/utils/TestDAGUtils.java | 24 ++++++++++++++++---- .../ats/HistoryEventTimelineConversion.java | 14 +++++++++++- .../ats/TestHistoryEventTimelineConversion.java | 4 ++-- 17 files changed, 102 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 25da87f..fd262ca 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -33,6 +33,7 @@ ALL CHANGES: TEZ-1632. NPE at TestPreemption.testPreemptionWithoutSession TEZ-1674. Rename configuration parameters related to counters / memory scaling. TEZ-1176. Set parallelism should end up sending an update to ATS if numTasks are updated at run-time. + TEZ-1658. Additional data generation to Timeline for UI. Release 0.5.1: 2014-10-02 http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java index 3859373..ab81683 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java +++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java @@ -43,6 +43,7 @@ public class ATSConstants { public static final String APPLICATION_ATTEMPT_ID = "applicationAttemptId"; public static final String CONTAINER_ID = "containerId"; public static final String NODE_ID = "nodeId"; + public static final String NODE_HTTP_ADDRESS = "nodeHttpAddress"; public static final String USER = "user"; /* Keys used in other info */ http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index e6a1d9c..1bdec84 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -689,7 +689,8 @@ public class DAGAppMaster extends AbstractService { try { if (LOG.isDebugEnabled()) { LOG.info("JSON dump for submitted DAG, dagId=" + dagId.toString() - + ", json=" + DAGUtils.generateSimpleJSONPlan(dagPB).toString()); + + ", json=" + + DAGUtils.generateSimpleJSONPlan(dagPB, newDag.getVertexNameIDMapping()).toString()); } } catch (JSONException e) { LOG.warn("Failed to generate json for DAG", e); @@ -1837,7 +1838,7 @@ public class DAGAppMaster extends AbstractService { // for an app later DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(), submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources, - newDAG.getUserName()); + newDAG.getUserName(), newDAG.getVertexNameIDMapping()); try { historyEventHandler.handleCriticalEvent( new DAGHistoryEvent(newDAG.getID(), submittedEvent)); http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java index a2f04ab..8677015 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java @@ -85,4 +85,6 @@ public interface DAG { ACLManager getACLManager(); + Map getVertexNameIDMapping(); + } http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index c4e16e2..823626d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -553,6 +553,20 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } @Override + public Map getVertexNameIDMapping() { + this.readLock.lock(); + try { + Map idNameMap = new HashMap(); + for (Vertex v : getVertices().values()) { + idNameMap.put(v.getName(), v.getVertexId()); + } + return idNameMap; + } finally { + this.readLock.unlock(); + } + } + + @Override public TezCounters getAllCounters() { readLock.lock(); http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 56dd303..eab07a5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -1003,7 +1003,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent( attemptId, getTask().getVertex().getName(), launchTime, containerId, containerNodeId, - inProgressLogsUrl, completedLogsUrl); + inProgressLogsUrl, completedLogsUrl, nodeHttpAddress); this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), startEvt)); } http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java index 5911ff3..0074a4e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java @@ -35,6 +35,7 @@ import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.SummaryEvent; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGSubmittedProto; import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto; import org.apache.tez.dag.utils.ProtoUtils; @@ -53,6 +54,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { private ApplicationAttemptId applicationAttemptId; private String user; private Map cumulativeAdditionalLocalResources; + private Map vertexNameIDMap; public DAGSubmittedEvent() { } @@ -60,7 +62,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { public DAGSubmittedEvent(TezDAGID dagID, long submitTime, DAGProtos.DAGPlan dagPlan, ApplicationAttemptId applicationAttemptId, Map cumulativeAdditionalLocalResources, - String user) { + String user, Map vertexNameIDMap) { this.dagID = dagID; this.dagName = dagPlan.getName(); this.submitTime = submitTime; @@ -68,6 +70,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { this.applicationAttemptId = applicationAttemptId; this.cumulativeAdditionalLocalResources = cumulativeAdditionalLocalResources; this.user = user; + this.vertexNameIDMap = vertexNameIDMap; } @Override @@ -182,4 +185,8 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { return user; } + public Map getVertexNameIDMap() { + return vertexNameIDMap; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java index f369823..36add86 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java @@ -39,11 +39,13 @@ public class TaskAttemptStartedEvent implements HistoryEvent { private long startTime; private ContainerId containerId; private NodeId nodeId; + private String nodeHttpAddress; public TaskAttemptStartedEvent(TezTaskAttemptID taId, String vertexName, long startTime, ContainerId containerId, NodeId nodeId, - String inProgressLogsUrl, String completedLogsUrl) { + String inProgressLogsUrl, String completedLogsUrl, + String nodeHttpAddress) { this.taskAttemptId = taId; this.vertexName = vertexName; this.startTime = startTime; @@ -51,6 +53,7 @@ public class TaskAttemptStartedEvent implements HistoryEvent { this.nodeId = nodeId; this.inProgressLogsUrl = inProgressLogsUrl; this.completedLogsUrl = completedLogsUrl; + this.nodeHttpAddress = nodeHttpAddress; } public TaskAttemptStartedEvent() { @@ -136,4 +139,8 @@ public class TaskAttemptStartedEvent implements HistoryEvent { return completedLogsUrl; } + public String getNodeHttpAddress() { + return nodeHttpAddress; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java index 37292ff..cec0a37 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java @@ -22,6 +22,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.Map.Entry; + import org.apache.tez.common.ATSConstants; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.history.HistoryEvent; @@ -44,6 +46,7 @@ import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; import org.apache.tez.dag.history.events.VertexStartedEvent; import org.apache.tez.dag.history.logging.EntityTypes; import org.apache.tez.dag.history.utils.DAGUtils; +import org.apache.tez.dag.records.TezVertexID; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -408,7 +411,7 @@ public class HistoryEventJsonConversion { // Other info such as dag plan JSONObject otherInfo = new JSONObject(); otherInfo.put(ATSConstants.DAG_PLAN, - DAGUtils.generateSimpleJSONPlan(event.getDAGPlan())); + DAGUtils.generateSimpleJSONPlan(event.getDAGPlan(), event.getVertexNameIDMap())); jsonObject.put(ATSConstants.OTHER_INFO, otherInfo); return jsonObject; http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java index 5d364fd..309d6d2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java @@ -33,9 +33,12 @@ import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.records.DAGProtos; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo; +import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.impl.VertexStats; import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -47,6 +50,7 @@ public class DAGUtils { public static final String VERTEX_GROUPS_KEY = "vertexGroups"; public static final String VERTEX_NAME_KEY = "vertexName"; + public static final String VERTEX_ID_KEY = "vertexId"; public static final String PROCESSOR_CLASS_KEY = "processorClass"; public static final String IN_EDGE_IDS_KEY = "inEdgeIds"; public static final String OUT_EDGE_IDS_KEY = "outEdgeIds"; @@ -81,10 +85,11 @@ public class DAGUtils { - public static JSONObject generateSimpleJSONPlan(DAGProtos.DAGPlan dagPlan) throws JSONException { + public static JSONObject generateSimpleJSONPlan(DAGPlan dagPlan, + Map vertexNameIDMap) throws JSONException { JSONObject dagJson; try { - dagJson = new JSONObject(convertDAGPlanToATSMap(dagPlan)); + dagJson = new JSONObject(convertDAGPlanToATSMap(dagPlan, vertexNameIDMap)); } catch (IOException e) { throw new TezUncheckedException(e); } @@ -125,7 +130,7 @@ public class DAGUtils { } public static Map convertDAGPlanToATSMap( - DAGProtos.DAGPlan dagPlan) throws IOException { + DAGPlan dagPlan, Map vertexNameIDMap) throws IOException { final String VERSION_KEY = "version"; final int version = 1; @@ -136,7 +141,12 @@ public class DAGUtils { for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) { Map vertexMap = new LinkedHashMap(); vertexMap.put(VERTEX_NAME_KEY, vertexPlan.getName()); - + if (vertexNameIDMap != null && !vertexNameIDMap.isEmpty()) { + TezVertexID vertexID = vertexNameIDMap.get(vertexPlan.getName()); + if (vertexID != null) { + vertexMap.put(VERTEX_ID_KEY, vertexID.toString()); + } + } if (vertexPlan.hasProcessorDescriptor()) { vertexMap.put(PROCESSOR_CLASS_KEY, vertexPlan.getProcessorDescriptor().getClassName()); http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java index a443a35..143268b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java @@ -79,7 +79,7 @@ public class TestTaskAttemptRecovery { private void restoreFromTAStartEvent() { TaskAttemptState recoveredState = ta.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, - startTime, mock(ContainerId.class), mock(NodeId.class), "", "")); + startTime, mock(ContainerId.class), mock(NodeId.class), "", "", "")); assertEquals(startTime, ta.getLaunchTime()); assertEquals(TaskAttemptState.RUNNING, recoveredState); } http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java index 6c0d55e..fe7ecff 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java @@ -210,7 +210,7 @@ public class TestTaskRecovery { long taStartTime = taskStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, - taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "")); + taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "")); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(0, task.getFinishedAttemptsCount()); assertEquals(taskScheduledTime, task.scheduledTime); @@ -642,7 +642,7 @@ public class TestTaskRecovery { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); TaskState recoveredState = task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, - taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "")); + taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "")); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(TaskAttemptStateInternal.NEW, ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState()); @@ -695,7 +695,7 @@ public class TestTaskRecovery { for (int i = 0; i < maxFailedAttempts; ++i) { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, - mock(ContainerId.class), mock(NodeId.class), "", "")); + mock(ContainerId.class), mock(NodeId.class), "", "", "")); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, 0, TaskAttemptState.KILLED, "", null)); } @@ -725,7 +725,7 @@ public class TestTaskRecovery { for (int i = 0; i < maxFailedAttempts; ++i) { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, - mock(ContainerId.class), mock(NodeId.class), "", "")); + mock(ContainerId.class), mock(NodeId.class), "", "", "")); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, 0, TaskAttemptState.FAILED, "", null)); } @@ -755,7 +755,7 @@ public class TestTaskRecovery { for (int i = 0; i < maxFailedAttempts - 1; ++i) { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, - mock(ContainerId.class), mock(NodeId.class), "", "")); + mock(ContainerId.class), mock(NodeId.class), "", "", "")); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, 0, TaskAttemptState.FAILED, "", null)); } @@ -765,7 +765,7 @@ public class TestTaskRecovery { TezTaskAttemptID newTaskAttemptId = getNewTaskAttemptID(task.getTaskId()); TaskState recoveredState = task.restoreFromEvent(new TaskAttemptStartedEvent(newTaskAttemptId, - vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "")); + vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", "")); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(TaskAttemptStateInternal.NEW, http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index f030db7..903b4fe 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -148,7 +148,7 @@ public class TestHistoryEventsProtoConversion { ApplicationId.newInstance(0, 1), 1), 1001l, DAGPlan.newBuilder().setName("foo").build(), ApplicationAttemptId.newInstance( - ApplicationId.newInstance(0, 1), 1), null, ""); + ApplicationId.newInstance(0, 1), 1), null, "", null); DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent) testProtoConversion(event); Assert.assertEquals(event.getApplicationAttemptId(), @@ -435,7 +435,7 @@ public class TestHistoryEventsProtoConversion { "vertex1", 10009l, ContainerId.newInstance( ApplicationAttemptId.newInstance( ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance( - "host1", 19999), "inProgress", "Completed"); + "host1", 19999), "inProgress", "Completed", "nodeHttpAddress"); TaskAttemptStartedEvent deserializedEvent = (TaskAttemptStartedEvent) testProtoConversion(event); Assert.assertEquals(event.getTaskAttemptID(), http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java index d7aca55..c9384e1 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java @@ -110,7 +110,7 @@ public class TestHistoryEventJsonConversion { break; case DAG_SUBMITTED: event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId, - null, user); + null, user, null); break; case DAG_INITIALIZED: event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName()); @@ -146,7 +146,7 @@ public class TestHistoryEventJsonConversion { break; case TASK_ATTEMPT_STARTED: event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId, - nodeId, null, null); + nodeId, null, null, "nodeHttpAddress"); break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java index 0be67ad..e50c67c 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java @@ -20,11 +20,13 @@ package org.apache.tez.dag.history.utils; import java.io.IOException; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.EdgeProperty; @@ -40,6 +42,8 @@ import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.OutputCommitter; import org.codehaus.jettison.json.JSONException; import org.junit.Assert; @@ -94,7 +98,17 @@ public class TestDAGUtils { @SuppressWarnings("unchecked") public void testConvertDAGPlanToATSMap() throws IOException, JSONException { DAGPlan dagPlan = createDAG(); - Map atsMap = DAGUtils.convertDAGPlanToATSMap(dagPlan); + Map idNameMap = new HashMap(); + ApplicationId appId = ApplicationId.newInstance(1, 1); + TezDAGID dagId = TezDAGID.getInstance(appId, 1); + TezVertexID vId1 = TezVertexID.getInstance(dagId, 1); + TezVertexID vId2 = TezVertexID.getInstance(dagId, 2); + TezVertexID vId3 = TezVertexID.getInstance(dagId, 3); + idNameMap.put("vertex1", vId1); + idNameMap.put("vertex2", vId2); + idNameMap.put("vertex3", vId3); + + Map atsMap = DAGUtils.convertDAGPlanToATSMap(dagPlan, idNameMap); Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_NAME_KEY)); Assert.assertEquals(dagPlan.getName(), atsMap.get(DAGUtils.DAG_NAME_KEY)); Assert.assertTrue(atsMap.containsKey("version")); @@ -104,7 +118,6 @@ public class TestDAGUtils { Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTEX_GROUPS_KEY)); Assert.assertEquals(3, ((Collection) atsMap.get(DAGUtils.VERTICES_KEY)).size()); - Set vNames = Sets.newHashSet("vertex1", "vertex2", "vertex3"); Set inEdgeIds = new HashSet(); Set outEdgeIds = new HashSet(); @@ -115,6 +128,10 @@ public class TestDAGUtils { for (Object o : ((Collection) atsMap.get(DAGUtils.VERTICES_KEY))) { Map v = (Map) o; Assert.assertTrue(v.containsKey(DAGUtils.VERTEX_NAME_KEY)); + Assert.assertTrue(v.containsKey(DAGUtils.VERTEX_ID_KEY)); + String vId = (String)v.get(DAGUtils.VERTEX_ID_KEY); + String vName = (String)v.get(DAGUtils.VERTEX_NAME_KEY); + Assert.assertEquals(idNameMap.get(vName).toString(), vId); Assert.assertTrue(v.containsKey(DAGUtils.PROCESSOR_CLASS_KEY)); Assert.assertTrue(v.containsKey(DAGUtils.USER_PAYLOAD_AS_TEXT)); @@ -125,8 +142,7 @@ public class TestDAGUtils { outEdgeIds.addAll(((Collection) v.get(DAGUtils.OUT_EDGE_IDS_KEY))); } - String vName = (String) v.get(DAGUtils.VERTEX_NAME_KEY); - Assert.assertTrue(vNames.contains(vName)); + Assert.assertTrue(idNameMap.containsKey(vName)); String procPayload = vName + " Processor HistoryText"; Assert.assertEquals(procPayload, v.get(DAGUtils.USER_PAYLOAD_AS_TEXT)); http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index 3ed3077..97cc3f7 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -199,6 +199,8 @@ public class HistoryEventTimelineConversion { stoppedEvt.setTimestamp(event.getStoppedTime()); atsEntity.addEvent(stoppedEvt); + atsEntity.addPrimaryFilter(ATSConstants.EXIT_STATUS, event.getExitStatus()); + atsEntity.addOtherInfo(ATSConstants.EXIT_STATUS, event.getExitStatus()); atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getStoppedTime()); @@ -217,6 +219,7 @@ public class HistoryEventTimelineConversion { atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName()); + atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); @@ -291,7 +294,7 @@ public class HistoryEventTimelineConversion { try { atsEntity.addOtherInfo(ATSConstants.DAG_PLAN, - DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan())); + DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan(), event.getVertexNameIDMap())); } catch (IOException e) { throw new TezUncheckedException(e); } @@ -318,6 +321,8 @@ public class HistoryEventTimelineConversion { finishEvt.setTimestamp(event.getFinishTime()); atsEntity.addEvent(finishEvt); + atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); + atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); @@ -355,6 +360,9 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl()); atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl()); + atsEntity.addOtherInfo(ATSConstants.NODE_ID, event.getNodeId().toString()); + atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress()); + atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString()); return atsEntity; } @@ -374,6 +382,8 @@ public class HistoryEventTimelineConversion { finishEvt.setTimestamp(event.getFinishTime()); atsEntity.addEvent(finishEvt); + atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); + atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); @@ -421,6 +431,8 @@ public class HistoryEventTimelineConversion { finishEvt.setTimestamp(event.getFinishTime()); atsEntity.addEvent(finishEvt); + atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); + atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); http://git-wip-us.apache.org/repos/asf/tez/blob/190a74fd/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index f275921..d36172f 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -109,7 +109,7 @@ public class TestHistoryEventTimelineConversion { break; case DAG_SUBMITTED: event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId, - null, user); + null, user, null); break; case DAG_INITIALIZED: event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName()); @@ -145,7 +145,7 @@ public class TestHistoryEventTimelineConversion { break; case TASK_ATTEMPT_STARTED: event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId, - nodeId, null, null); + nodeId, null, null, "nodeHttpAddress"); break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),