Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9EEC917381 for ; Thu, 2 Oct 2014 20:45:49 +0000 (UTC) Received: (qmail 9017 invoked by uid 500); 2 Oct 2014 20:45:49 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 8926 invoked by uid 500); 2 Oct 2014 20:45:49 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 8904 invoked by uid 99); 2 Oct 2014 20:45:49 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Oct 2014 20:45:49 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 274DBA179F8; Thu, 2 Oct 2014 20:45:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jeagles@apache.org To: commits@tez.apache.org Date: Thu, 02 Oct 2014 20:45:49 -0000 Message-Id: <203a57e62c9747a58d9bb38892359556@git.apache.org> In-Reply-To: <5427deca579743e3804373b15e837d93@git.apache.org> References: <5427deca579743e3804373b15e837d93@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/28] TEZ-1495. ATS integration for TezClient (Prakash Ramachandran via bikas) http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java index 8057714..d9cafc7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java @@ -21,6 +21,7 @@ package org.apache.tez.dag.history.events; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -52,12 +53,13 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent { private TezCounters tezCounters; private boolean fromSummary = false; private VertexStats vertexStats; + private Map vertexTaskStats; - public VertexFinishedEvent(TezVertexID vertexId, - String vertexName, long initRequestedTime, long initedTime, - long startRequestedTime, long startedTime, long finishTime, - VertexState state, String diagnostics, TezCounters counters, - VertexStats vertexStats) { + public VertexFinishedEvent(TezVertexID vertexId, String vertexName, long initRequestedTime, + long initedTime, long startRequestedTime, long startedTime, + long finishTime, VertexState state, String diagnostics, + TezCounters counters, VertexStats vertexStats, + Map vertexTaskStats) { this.vertexName = vertexName; this.vertexID = vertexId; this.initRequestedTime = initRequestedTime; @@ -69,6 +71,7 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent { this.diagnostics = diagnostics; this.tezCounters = counters; this.vertexStats = vertexStats; + this.vertexTaskStats = vertexTaskStats; } public VertexFinishedEvent() { @@ -138,10 +141,9 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent { + ", status=" + state.name() + ", diagnostics=" + diagnostics + ", counters=" + ( tezCounters == null ? "null" : - tezCounters.toString() - .replaceAll("\\n", ", ").replaceAll("\\s+", " ")) - + ", vertexStats=" + (vertexStats == null ? "null" - : vertexStats.toString()); + tezCounters.toString().replaceAll("\\n", ", ").replaceAll("\\s+", " ")) + + ", vertexStats=" + (vertexStats == null ? "null" : vertexStats.toString()) + + ", vertexTaskStats=" + (vertexTaskStats == null ? "null" : vertexTaskStats.toString()); } public TezVertexID getVertexID() { @@ -176,6 +178,10 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent { return startTime; } + public Map getVertexTaskStats() { + return vertexTaskStats; + } + @Override public void toSummaryProtoStream(OutputStream outputStream) throws IOException { VertexFinishStateProto finishStateProto = http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java index 136c5f1..a8bd21e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import org.apache.tez.dag.app.dag.VertexState; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.records.TezVertexID; @@ -105,4 +106,7 @@ public class VertexStartedEvent implements HistoryEvent { return startTime; } + public VertexState getVertexState() { + return VertexState.RUNNING; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java index 12dcf1c..a9987d6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java @@ -18,6 +18,7 @@ package org.apache.tez.dag.history.logging.impl; +import org.apache.tez.common.ATSConstants; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.events.AMLaunchedEvent; @@ -36,7 +37,6 @@ import org.apache.tez.dag.history.events.VertexFinishedEvent; import org.apache.tez.dag.history.events.VertexInitializedEvent; import org.apache.tez.dag.history.events.VertexStartedEvent; import org.apache.tez.dag.history.logging.EntityTypes; -import org.apache.tez.dag.history.utils.ATSConstants; import org.apache.tez.dag.history.utils.DAGUtils; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java deleted file mode 100644 index 0188a8e..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.history.utils; - -public class ATSConstants { - - // TODO remove once YARN exposes proper constants - - /* Top level keys */ - public static final String ENTITY = "entity"; - public static final String ENTITY_TYPE = "entitytype"; - public static final String EVENTS = "events"; - public static final String EVENT_TYPE = "eventtype"; - public static final String TIMESTAMP = "ts"; - public static final String EVENT_INFO = "eventinfo"; - public static final String RELATED_ENTITIES = "relatedEntities"; - public static final String PRIMARY_FILTERS = "primaryfilters"; - public static final String SECONDARY_FILTERS = "secondaryfilters"; - public static final String OTHER_INFO = "otherinfo"; - - /* Section for related entities */ - public static final String APPLICATION_ID = "applicationId"; - public static final String APPLICATION_ATTEMPT_ID = "applicationAttemptId"; - public static final String CONTAINER_ID = "containerId"; - public static final String NODE_ID = "nodeId"; - public static final String USER = "user"; - - /* Keys used in other info */ - public static final String APP_SUBMIT_TIME = "appSubmitTime"; - - /* Tez-specific info */ - public static final String DAG_PLAN = "dagPlan"; - public static final String DAG_NAME = "dagName"; - public static final String VERTEX_NAME = "vertexName"; - public static final String SCHEDULED_TIME = "scheduledTime"; - public static final String INIT_REQUESTED_TIME = "initRequestedTime"; - public static final String INIT_TIME = "initTime"; - public static final String START_REQUESTED_TIME = "startRequestedTime"; - public static final String START_TIME = "startTime"; - public static final String FINISH_TIME = "endTime"; - public static final String TIME_TAKEN = "timeTaken"; - public static final String STATUS = "status"; - public static final String DIAGNOSTICS = "diagnostics"; - public static final String COUNTERS = "counters"; - public static final String STATS = "stats"; - public static final String NUM_TASKS = "numTasks"; - public static final String PROCESSOR_CLASS_NAME = "processorClassName"; - public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL"; - public static final String COMPLETED_LOGS_URL = "completedLogsURL"; - public static final String EXIT_STATUS = "exitStatus"; - - /* Counters-related keys */ - public static final String COUNTER_GROUPS = "counterGroups"; - public static final String COUNTER_GROUP_NAME = "counterGroupName"; - public static final String COUNTER_GROUP_DISPLAY_NAME = "counterGroupDisplayName"; - public static final String COUNTER_NAME = "counterName"; - public static final String COUNTER_DISPLAY_NAME = "counterDisplayName"; - public static final String COUNTER_VALUE = "counterValue"; - -} http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java index 232a3b2..cab3c83 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.LinkedHashMap; import java.util.Map; +import org.apache.tez.common.ATSConstants; import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java index 9042a93..b278d8f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java @@ -572,7 +572,7 @@ public class TestVertexRecovery { vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(), "vertex1", initRequestedTime, initedTime, startRequestedTime, startTime, finishTime, VertexState.SUCCEEDED, "", - new TezCounters(), new VertexStats())); + new TezCounters(), new VertexStats(), null)); assertEquals(finishTime, vertex1.finishTime); assertEquals(VertexState.SUCCEEDED, recoveredState); assertEquals(false, vertex1.recoveryCommitInProgress); @@ -801,7 +801,7 @@ public class TestVertexRecovery { recoveredState = vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(), "vertex1", initRequestedTime, initedTime, initRequestedTime + 300L, initRequestedTime + 400L, initRequestedTime + 500L, - VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats())); + VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), null)); assertEquals(VertexState.SUCCEEDED, recoveredState); vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index bcbe6f1..f52671c 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -346,7 +346,7 @@ public class TestHistoryEventsProtoConversion { new VertexFinishedEvent(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), "vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR, - null, null, null); + null, null, null, null); VertexFinishedEvent deserializedEvent = (VertexFinishedEvent) testProtoConversion(event); Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID()); @@ -361,7 +361,7 @@ public class TestHistoryEventsProtoConversion { new VertexFinishedEvent(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), "vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR, - "diagnose", new TezCounters(), new VertexStats()); + "diagnose", new TezCounters(), new VertexStats(), null); VertexFinishedEvent deserializedEvent = (VertexFinishedEvent) testProtoConversion(event); Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID()); http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java index f674fc0..2149053 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java @@ -127,7 +127,7 @@ public class TestHistoryEventJsonConversion { case VERTEX_FINISHED: event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(), random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR, - null, null, null); + null, null, null, null); break; case TASK_STARTED: event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt()); http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index f078f1d..a81bdf4 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -19,9 +19,11 @@ package org.apache.tez.dag.history.logging.ats; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.tez.common.ATSConstants; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; @@ -41,7 +43,6 @@ import org.apache.tez.dag.history.events.VertexFinishedEvent; import org.apache.tez.dag.history.events.VertexInitializedEvent; import org.apache.tez.dag.history.events.VertexStartedEvent; import org.apache.tez.dag.history.logging.EntityTypes; -import org.apache.tez.dag.history.utils.ATSConstants; import org.apache.tez.dag.history.utils.DAGUtils; public class HistoryEventTimelineConversion { @@ -253,6 +254,7 @@ public class HistoryEventTimelineConversion { atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName()); atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getDagState().toString()); return atsEntity; } @@ -422,6 +424,13 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.STATS, DAGUtils.convertVertexStatsToATSMap(event.getVertexStats())); + final Map vertexTaskStats = event.getVertexTaskStats(); + if (vertexTaskStats != null) { + for(String key : vertexTaskStats.keySet()) { + atsEntity.addOtherInfo(key, vertexTaskStats.get(key)); + } + } + return atsEntity; } @@ -464,6 +473,7 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime()); atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getVertexState().toString()); return atsEntity; } http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index b04b8d4..d2e366d 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -127,7 +127,7 @@ public class TestHistoryEventTimelineConversion { case VERTEX_FINISHED: event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(), random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR, - null, null, null); + null, null, null, null); break; case TASK_STARTED: event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());