Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 935421886C for ; Thu, 6 Aug 2015 09:25:54 +0000 (UTC) Received: (qmail 53733 invoked by uid 500); 6 Aug 2015 09:25:54 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 53622 invoked by uid 500); 6 Aug 2015 09:25:54 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 52851 invoked by uid 99); 6 Aug 2015 09:25:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Aug 2015 09:25:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C96C2E6825; Thu, 6 Aug 2015 09:25:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Thu, 06 Aug 2015 09:26:11 -0000 Message-Id: <6f125e406a1d41378605307d8b5d59f6@git.apache.org> In-Reply-To: <10a0859464864279b3a9f48415e4a976@git.apache.org> References: <10a0859464864279b3a9f48415e4a976@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [19/51] [abbrv] tez git commit: TEZ-2347. Expose additional information in TaskCommunicatorContext. (sseth) TEZ-2347. Expose additional information in TaskCommunicatorContext. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e8c0c7a8 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e8c0c7a8 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e8c0c7a8 Branch: refs/heads/TEZ-2003 Commit: e8c0c7a8a635903dd5809807cc94469ff8b3e88a Parents: 07e5e42 Author: Siddharth Seth Authored: Mon Apr 20 13:17:31 2015 -0700 Committer: Siddharth Seth Committed: Thu Aug 6 01:25:10 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../tez/dag/api/TaskCommunicatorContext.java | 50 ++++++++++++++++++++ .../dag/app/TaskCommunicatorContextImpl.java | 50 ++++++++++++++++++++ .../java/org/apache/tez/dag/app/dag/DAG.java | 2 + .../java/org/apache/tez/dag/app/dag/Task.java | 2 + .../org/apache/tez/dag/app/dag/TaskAttempt.java | 6 +++ .../apache/tez/dag/app/dag/impl/DAGImpl.java | 10 ++++ .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 12 +++++ .../apache/tez/dag/app/dag/impl/TaskImpl.java | 13 ++++- 9 files changed, 145 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e8c0c7a8/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index ca5225e..7c13110 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -16,5 +16,6 @@ ALL CHANGES: TEZ-2284. Separate TaskReporter into an interface. TEZ-2285. Allow TaskCommunicators to indicate task/container liveness. TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates. + TEZ-2347. Expose additional information in TaskCommunicatorContext. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/e8c0c7a8/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java index 19caed9..56345ab 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java @@ -16,6 +16,7 @@ package org.apache.tez.dag.api; import javax.annotation.Nullable; import java.io.IOException; +import java.util.Collection; import java.util.Set; import org.apache.hadoop.security.Credentials; @@ -71,4 +72,53 @@ public interface TaskCommunicatorContext { // TODO TEZ-2003 API. Should a method exist for task succeeded. // TODO Eventually Add methods to report availability stats to the scheduler. + + /** + * Get the name of the currently executing dag + * @return the name of the currently executing dag + */ + String getCurretnDagName(); + + /** + * Get the name of the Input vertices for the specified vertex. + * Root Inputs are not returned. + * @param vertexName the vertex for which source vertex names will be returned + * @return an Iterable containing the list of input vertices for the specified vertex + */ + Iterable getInputVertexNames(String vertexName); + + /** + * Get the total number of tasks in the given vertex + * @param vertexName + * @return total number of tasks in this vertex + */ + int getVertexTotalTaskCount(String vertexName); + + /** + * Get the number of completed tasks for a given vertex + * @param vertexName the vertex name + * @return the number of completed tasks for the vertex + */ + int getVertexCompletedTaskCount(String vertexName); + + /** + * Get the number of running tasks for a given vertex + * @param vertexName the vertex name + * @return the number of running tasks for the vertex + */ + int getVertexRunningTaskCount(String vertexName); + + /** + * Get the start time for the first attempt of the specified task + * @param vertexName the vertex to which the task belongs + * @param taskIndex the index of the task + * @return the start time for the first attempt of the task + */ + long getFirstAttemptStartTime(String vertexName, int taskIndex); + + /** + * Get the start time for the currently executing DAG + * @return time when the current dag started executing + */ + long getDagStartTime(); } http://git-wip-us.apache.org/repos/asf/tez/blob/e8c0c7a8/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java index 3714c3c..4cb0c93 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java @@ -18,7 +18,9 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.Set; +import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -31,6 +33,7 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.VertexStateUpdateListener; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -111,6 +114,53 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver context.getCurrentDAG().getStateChangeNotifier().registerForVertexUpdates(vertexName, stateSet, this); } + @Override + public String getCurretnDagName() { + return context.getCurrentDAG().getName(); + } + + @Override + public Iterable getInputVertexNames(String vertexName) { + Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName); + Vertex vertex = context.getCurrentDAG().getVertex(vertexName); + Set sources = vertex.getInputVertices().keySet(); + return Iterables.transform(sources, new Function() { + @Override + public String apply(@Nullable Vertex input) { + return input.getName(); + } + }); + } + + @Override + public int getVertexTotalTaskCount(String vertexName) { + Preconditions.checkArgument(vertexName != null, "VertexName must be specified"); + return context.getCurrentDAG().getVertex(vertexName).getTotalTasks(); + } + + @Override + public int getVertexCompletedTaskCount(String vertexName) { + Preconditions.checkArgument(vertexName != null, "VertexName must be specified"); + return context.getCurrentDAG().getVertex(vertexName).getCompletedTasks(); + } + + @Override + public int getVertexRunningTaskCount(String vertexName) { + Preconditions.checkArgument(vertexName != null, "VertexName must be specified"); + return context.getCurrentDAG().getVertex(vertexName).getRunningTasks(); + } + + @Override + public long getFirstAttemptStartTime(String vertexName, int taskIndex) { + Preconditions.checkArgument(vertexName != null, "VertexName must be specified"); + Preconditions.checkArgument(taskIndex >=0, "TaskIndex must be > 0"); + return context.getCurrentDAG().getVertex(vertexName).getTask(taskIndex).getFirstAttemptStartTime(); + } + + @Override + public long getDagStartTime() { + return context.getCurrentDAG().getStartTime(); + } @Override public void onStateUpdated(VertexStateUpdate event) { http://git-wip-us.apache.org/repos/asf/tez/blob/e8c0c7a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java index 6d6872b..458362f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java @@ -94,6 +94,8 @@ public interface DAG { Map getVertexNameIDMapping(); + long getStartTime(); + StateChangeNotifier getStateChangeNotifier(); } http://git-wip-us.apache.org/repos/asf/tez/blob/e8c0c7a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java index 47b56f2..a011b61 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java @@ -72,4 +72,6 @@ public interface Task { public TaskSpec getBaseTaskSpec(); public TaskLocationHint getTaskLocationHint(); + + long getFirstAttemptStartTime(); } http://git-wip-us.apache.org/repos/asf/tez/blob/e8c0c7a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java index 4360cc3..cbe72c1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java @@ -125,6 +125,12 @@ public interface TaskAttempt { */ long getLaunchTime(); + /** + * Get the time at which this attempt was scheduled + * @return the time at which this attempt was scheduled, 0 if it hasn't been scheduled yet + */ + long getScheduleTime(); + /** * @return attempt's finish time. If attempt is not finished * yet, returns 0. http://git-wip-us.apache.org/repos/asf/tez/blob/e8c0c7a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index ef2df78..e37fc2f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -702,6 +702,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } @Override + public long getStartTime() { + readLock.lock(); + try { + return this.startTime; + } finally { + readLock.unlock(); + } + } + + @Override public StateChangeNotifier getStateChangeNotifier() { return entityUpdateTracker; } http://git-wip-us.apache.org/repos/asf/tez/blob/e8c0c7a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index e70123e..5085691 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -133,6 +133,7 @@ public class TaskAttemptImpl implements TaskAttempt, protected final AppContext appContext; private final TaskHeartbeatHandler taskHeartbeatHandler; private long launchTime = 0; + private long scheduleTime = 0; private long finishTime = 0; private String trackerName; private int httpPort; @@ -670,6 +671,16 @@ public class TaskAttemptImpl implements TaskAttempt, } @Override + public long getScheduleTime() { + readLock.lock(); + try { + return scheduleTime; + } finally { + readLock.unlock(); + } + } + + @Override public long getFinishTime() { readLock.lock(); try { @@ -1029,6 +1040,7 @@ public class TaskAttemptImpl implements TaskAttempt, public TaskAttemptStateInternal transition(TaskAttemptImpl ta, TaskAttemptEvent event) { TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event; + ta.scheduleTime = ta.clock.getTime(); // TODO Creating the remote task here may not be required in case of // recovery. http://git-wip-us.apache.org/repos/asf/tez/blob/e8c0c7a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index e6027f5..93b4c3f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -1529,7 +1529,18 @@ public class TaskImpl implements Task, EventHandler { this.writeLock.unlock(); } } - + + @Override + public long getFirstAttemptStartTime() { + readLock.lock(); + try { + // The first attempt will always have an index of 0. + return getAttempt(TezTaskAttemptID.getInstance(getTaskId(), 0)).getScheduleTime(); + } finally { + readLock.unlock(); + } + } + private static class KillTransition implements SingleArcTransition { @Override