Return-Path:
X-Original-To: apmail-tez-commits-archive@minotaur.apache.org
Delivered-To: apmail-tez-commits-archive@minotaur.apache.org
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by minotaur.apache.org (Postfix) with SMTP id EAAB71846F
for ;
Sat, 22 Aug 2015 07:26:00 +0000 (UTC)
Received: (qmail 98805 invoked by uid 500); 22 Aug 2015 07:26:00 -0000
Delivered-To: apmail-tez-commits-archive@tez.apache.org
Received: (qmail 98701 invoked by uid 500); 22 Aug 2015 07:26:00 -0000
Mailing-List: contact commits-help@tez.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: dev@tez.apache.org
Delivered-To: mailing list commits@tez.apache.org
Received: (qmail 98610 invoked by uid 99); 22 Aug 2015 07:26:00 -0000
Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org)
(140.211.11.23)
by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Aug 2015 07:26:00 +0000
Received: by git1-us-west.apache.org (ASF Mail Server at
git1-us-west.apache.org, from userid 33)
id 8CDB1DFD9E; Sat, 22 Aug 2015 07:26:00 +0000 (UTC)
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
From: sseth@apache.org
To: commits@tez.apache.org
Date: Sat, 22 Aug 2015 07:26:04 -0000
Message-Id: <33ccf58d18f54ffdb68603c32d5191a9@git.apache.org>
In-Reply-To: <2a5d73e46f33459bb1e1ac71d0f152f9@git.apache.org>
References: <2a5d73e46f33459bb1e1ac71d0f152f9@git.apache.org>
X-Mailer: ASF-Git Admin Mailer
Subject: [05/50] [abbrv] tez git commit: TEZ-2302. Allow TaskCommunicators to
subscribe for Vertex updates. (sseth)
TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f0c8845c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f0c8845c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f0c8845c
Branch: refs/heads/master
Commit: f0c8845cb687d6b0af22b5800ca1e8947a34fac9
Parents: 952980a
Author: Siddharth Seth
Authored: Thu Apr 9 13:33:48 2015 -0700
Committer: Siddharth Seth
Committed: Fri Aug 21 18:13:54 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../apache/tez/dag/api/TaskCommunicator.java | 20 +++
.../tez/dag/api/TaskCommunicatorContext.java | 14 ++-
.../dag/app/TaskAttemptListenerImpTezDag.java | 52 +++-----
.../dag/app/TaskCommunicatorContextImpl.java | 124 +++++++++++++++++++
.../tez/dag/app/TezTaskCommunicatorImpl.java | 6 +
.../java/org/apache/tez/dag/app/dag/DAG.java | 2 +
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 5 +
8 files changed, 188 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 9d6b220..ca5225e 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -15,5 +15,6 @@ ALL CHANGES:
TEZ-2283. Fixes after rebase 04/07.
TEZ-2284. Separate TaskReporter into an interface.
TEZ-2285. Allow TaskCommunicators to indicate task/container liveness.
+ TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 945091e..a2cd858 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -54,4 +55,23 @@ public abstract class TaskCommunicator extends AbstractService {
public abstract InetSocketAddress getAddress();
// TODO Eventually. Add methods here to support preemption of tasks.
+
+ /**
+ * Receive notifications on vertex state changes.
+ *
+ * State changes will be received based on the registration via {@link
+ * org.apache.tez.runtime.api.InputInitializerContext#registerForVertexStateUpdates(String,
+ * java.util.Set)}. Notifications will be received for all registered state changes, and not just
+ * for the latest state update. They will be in order in which the state change occurred.
+ *
+ * Extensive processing should not be performed via this method call. Instead this should just be
+ * used as a notification mechanism.
+ *
This method may be invoked concurrently with other invocations into the TaskCommunicator and
+ * multi-threading/concurrency implications must be considered.
+ * @param stateUpdate an event indicating the name of the vertex, and it's updated state.
+ * Additional information may be available for specific events, Look at the
+ * type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate}
+ * @throws Exception
+ */
+ public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index 0c3bac3..19caed9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -16,10 +16,12 @@ package org.apache.tez.dag.api;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.Set;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -48,7 +50,7 @@ public interface TaskCommunicatorContext {
void containerAlive(ContainerId containerId);
// TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt*
- void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId);
+ void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId);
// TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt*
void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics);
@@ -56,6 +58,16 @@ public interface TaskCommunicatorContext {
// TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt*
void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics);
+ /**
+ * Register to get notifications on updates to the specified vertex. Notifications will be sent
+ * via {@link org.apache.tez.runtime.api.InputInitializer#onVertexStateUpdated(org.apache.tez.dag.api.event.VertexStateUpdate)}
+ *
+ * This method can only be invoked once. Duplicate invocations will result in an error.
+ *
+ * @param vertexName the vertex name for which notifications are required.
+ * @param stateSet the set of states for which notifications are required. null implies all
+ */
+ void registerForVertexStateUpdates(String vertexName, @Nullable Set stateSet);
// TODO TEZ-2003 API. Should a method exist for task succeeded.
// TODO Eventually Add methods to report availability stats to the scheduler.
http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index a6994d2..386e4af 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -17,6 +17,7 @@
package org.apache.tez.dag.app;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
@@ -26,6 +27,7 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -34,13 +36,13 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventType;
+import com.google.common.base.Preconditions;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtilsInternal;
@@ -66,14 +68,12 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.common.security.JobTokenSecretManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@SuppressWarnings("unchecked")
@InterfaceAudience.Private
public class TaskAttemptListenerImpTezDag extends AbstractService implements
- TaskAttemptListener, TaskCommunicatorContext {
+ TaskAttemptListener {
private static final Logger LOG = LoggerFactory
.getLogger(TaskAttemptListenerImpTezDag.class);
@@ -123,7 +123,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
this.taskCommunicators = new TaskCommunicator[taskCommunicatorClassIdentifiers.length];
for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) {
- taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i]);
+ taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i], i);
}
// TODO TEZ-2118 Start using taskCommunicator indices properly
}
@@ -144,13 +144,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
}
- private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier) {
+ private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier, int taskCommIndex) {
if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
LOG.info("Using Default Task Communicator");
- return new TezTaskCommunicatorImpl(this);
+ return new TezTaskCommunicatorImpl(new TaskCommunicatorContextImpl(context, this, taskCommIndex));
} else if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
LOG.info("Using Default Local Task Communicator");
- return new TezLocalTaskCommunicatorImpl(this);
+ return new TezLocalTaskCommunicatorImpl(new TaskCommunicatorContextImpl(context, this, taskCommIndex));
} else {
LOG.info("Using TaskCommunicator: " + taskCommClassIdentifier);
Class extends TaskCommunicator> taskCommClazz = (Class extends TaskCommunicator>) ReflectionUtils
@@ -158,7 +158,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
try {
Constructor extends TaskCommunicator> ctor = taskCommClazz.getConstructor(TaskCommunicatorContext.class);
ctor.setAccessible(true);
- return ctor.newInstance(this);
+ return ctor.newInstance(new TaskCommunicatorContextImpl(context, this, taskCommIndex));
} catch (NoSuchMethodException e) {
throw new TezUncheckedException(e);
} catch (InvocationTargetException e) {
@@ -170,18 +170,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
}
}
-
- @Override
- public ApplicationAttemptId getApplicationAttemptId() {
- return context.getApplicationAttemptId();
- }
-
- @Override
- public Credentials getCredentials() {
- return context.getAppCredentials();
- }
-
- @Override
public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
throws IOException, TezException {
ContainerId containerId = ConverterUtils.toContainerId(request
@@ -251,30 +239,20 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
return new TaskHeartbeatResponse(false, outEvents);
}
-
- @Override
- public boolean isKnownContainer(ContainerId containerId) {
- return context.getAllContainers().get(containerId) != null;
- }
-
- @Override
public void taskAlive(TezTaskAttemptID taskAttemptId) {
taskHeartbeatHandler.pinged(taskAttemptId);
}
- @Override
public void containerAlive(ContainerId containerId) {
pingContainerHeartbeatHandler(containerId);
}
- @Override
public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) {
context.getEventHandler()
.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null));
pingContainerHeartbeatHandler(containerId);
}
- @Override
public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
String diagnostics) {
// Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
@@ -287,7 +265,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
taskAttemptEndReason)));
}
- @Override
public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
String diagnostics) {
// Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
@@ -300,6 +277,11 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
taskAttemptEndReason)));
}
+ public void vertexStateUpdateNotificationReceived(VertexStateUpdate event, int taskCommIndex) throws
+ Exception {
+ taskCommunicators[taskCommIndex].onVertexStateUpdated(event);
+ }
+
/**
* Child checking whether it can commit.
@@ -309,7 +291,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
* {@link Task#canCommit(TezTaskAttemptID)} This is * a legacy from the
* centralized commit protocol handling by the JobTracker.
*/
- @Override
+// @Override
public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
LOG.info("Commit go/no-go request from " + taskAttemptId.toString());
// An attempt is asking if it can commit its output. This can be decided
http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
new file mode 100644
index 0000000..3714c3c
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.api.TaskHeartbeatRequest;
+import org.apache.tez.dag.api.TaskHeartbeatResponse;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+@InterfaceAudience.Private
+public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, VertexStateUpdateListener {
+
+
+ private final AppContext context;
+ private final TaskAttemptListenerImpTezDag taskAttemptListener;
+ private final int taskCommunicatorIndex;
+
+ public TaskCommunicatorContextImpl(AppContext appContext,
+ TaskAttemptListenerImpTezDag taskAttemptListener,
+ int taskCommunicatorIndex) {
+ this.context = appContext;
+ this.taskAttemptListener = taskAttemptListener;
+ this.taskCommunicatorIndex = taskCommunicatorIndex;
+ }
+
+ @Override
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return context.getApplicationAttemptId();
+ }
+
+ @Override
+ public Credentials getCredentials() {
+ return context.getAppCredentials();
+ }
+
+ @Override
+ public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
+ return taskAttemptListener.canCommit(taskAttemptId);
+ }
+
+ @Override
+ public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException,
+ TezException {
+ return taskAttemptListener.heartbeat(request);
+ }
+
+ @Override
+ public boolean isKnownContainer(ContainerId containerId) {
+ return context.getAllContainers().get(containerId) != null;
+ }
+
+ @Override
+ public void taskAlive(TezTaskAttemptID taskAttemptId) {
+ taskAttemptListener.taskAlive(taskAttemptId);
+ }
+
+ @Override
+ public void containerAlive(ContainerId containerId) {
+ taskAttemptListener.containerAlive(containerId);
+ }
+
+ @Override
+ public void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId) {
+ taskAttemptListener.taskStartedRemotely(taskAttemptId, containerId);
+ }
+
+ @Override
+ public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+ @Nullable String diagnostics) {
+ taskAttemptListener.taskKilled(taskAttemptId, taskAttemptEndReason, diagnostics);
+ }
+
+ @Override
+ public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+ @Nullable String diagnostics) {
+ taskAttemptListener.taskFailed(taskAttemptId, taskAttemptEndReason, diagnostics);
+
+ }
+
+ @Override
+ public void registerForVertexStateUpdates(String vertexName,
+ @Nullable Set stateSet) {
+ Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName);
+ context.getCurrentDAG().getStateChangeNotifier().registerForVertexUpdates(vertexName, stateSet, this);
+ }
+
+
+ @Override
+ public void onStateUpdated(VertexStateUpdate event) {
+ try {
+ taskAttemptListener.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex);
+ } catch (Exception e) {
+ // TODO TEZ-2003 This needs to be propagated to the DAG as a user error.
+ throw new TezUncheckedException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index a4a707b..fa2749a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -48,6 +48,7 @@ import org.apache.tez.dag.api.TaskHeartbeatResponse;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.security.authorize.TezAMPolicyProvider;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -254,6 +255,11 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
return address;
}
+ @Override
+ public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
+ // Empty. Not registering, or expecting any updates.
+ }
+
protected String getTokenIdentifier() {
return tokenIdentifier;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 4c3426a..6d6872b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -94,4 +94,6 @@ public interface DAG {
Map getVertexNameIDMapping();
+ StateChangeNotifier getStateChangeNotifier();
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 3d44ba6..ef2df78 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -702,6 +702,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
@Override
+ public StateChangeNotifier getStateChangeNotifier() {
+ return entityUpdateTracker;
+ }
+
+ @Override
public TezCounters getAllCounters() {
readLock.lock();