Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EAAB71846F for ; Sat, 22 Aug 2015 07:26:00 +0000 (UTC) Received: (qmail 98805 invoked by uid 500); 22 Aug 2015 07:26:00 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 98701 invoked by uid 500); 22 Aug 2015 07:26:00 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 98610 invoked by uid 99); 22 Aug 2015 07:26:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Aug 2015 07:26:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8CDB1DFD9E; Sat, 22 Aug 2015 07:26:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Sat, 22 Aug 2015 07:26:04 -0000 Message-Id: <33ccf58d18f54ffdb68603c32d5191a9@git.apache.org> In-Reply-To: <2a5d73e46f33459bb1e1ac71d0f152f9@git.apache.org> References: <2a5d73e46f33459bb1e1ac71d0f152f9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/50] [abbrv] tez git commit: TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates. (sseth) TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f0c8845c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f0c8845c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f0c8845c Branch: refs/heads/master Commit: f0c8845cb687d6b0af22b5800ca1e8947a34fac9 Parents: 952980a Author: Siddharth Seth Authored: Thu Apr 9 13:33:48 2015 -0700 Committer: Siddharth Seth Committed: Fri Aug 21 18:13:54 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../apache/tez/dag/api/TaskCommunicator.java | 20 +++ .../tez/dag/api/TaskCommunicatorContext.java | 14 ++- .../dag/app/TaskAttemptListenerImpTezDag.java | 52 +++----- .../dag/app/TaskCommunicatorContextImpl.java | 124 +++++++++++++++++++ .../tez/dag/app/TezTaskCommunicatorImpl.java | 6 + .../java/org/apache/tez/dag/app/dag/DAG.java | 2 + .../apache/tez/dag/app/dag/impl/DAGImpl.java | 5 + 8 files changed, 188 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 9d6b220..ca5225e 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -15,5 +15,6 @@ ALL CHANGES: TEZ-2283. Fixes after rebase 04/07. TEZ-2284. Separate TaskReporter into an interface. TEZ-2285. Allow TaskCommunicators to indicate task/container liveness. + TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java index 945091e..a2cd858 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java @@ -22,6 +22,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.TaskSpec; @@ -54,4 +55,23 @@ public abstract class TaskCommunicator extends AbstractService { public abstract InetSocketAddress getAddress(); // TODO Eventually. Add methods here to support preemption of tasks. + + /** + * Receive notifications on vertex state changes. + *

+ * State changes will be received based on the registration via {@link + * org.apache.tez.runtime.api.InputInitializerContext#registerForVertexStateUpdates(String, + * java.util.Set)}. Notifications will be received for all registered state changes, and not just + * for the latest state update. They will be in order in which the state change occurred.

+ * + * Extensive processing should not be performed via this method call. Instead this should just be + * used as a notification mechanism. + *
This method may be invoked concurrently with other invocations into the TaskCommunicator and + * multi-threading/concurrency implications must be considered. + * @param stateUpdate an event indicating the name of the vertex, and it's updated state. + * Additional information may be available for specific events, Look at the + * type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate} + * @throws Exception + */ + public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception; } http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java index 0c3bac3..19caed9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java @@ -16,10 +16,12 @@ package org.apache.tez.dag.api; import javax.annotation.Nullable; import java.io.IOException; +import java.util.Set; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -48,7 +50,7 @@ public interface TaskCommunicatorContext { void containerAlive(ContainerId containerId); // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt* - void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId); + void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId); // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt* void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics); @@ -56,6 +58,16 @@ public interface TaskCommunicatorContext { // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt* void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics); + /** + * Register to get notifications on updates to the specified vertex. Notifications will be sent + * via {@link org.apache.tez.runtime.api.InputInitializer#onVertexStateUpdated(org.apache.tez.dag.api.event.VertexStateUpdate)}

+ * + * This method can only be invoked once. Duplicate invocations will result in an error. + * + * @param vertexName the vertex name for which notifications are required. + * @param stateSet the set of states for which notifications are required. null implies all + */ + void registerForVertexStateUpdates(String vertexName, @Nullable Set stateSet); // TODO TEZ-2003 API. Should a method exist for task succeeded. // TODO Eventually Add methods to report availability stats to the scheduler. http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index a6994d2..386e4af 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -17,6 +17,7 @@ package org.apache.tez.dag.app; +import javax.annotation.Nullable; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -26,6 +27,7 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -34,13 +36,13 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventType; +import com.google.common.base.Preconditions; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezUtilsInternal; @@ -66,14 +68,12 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.common.security.JobTokenSecretManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @SuppressWarnings("unchecked") @InterfaceAudience.Private public class TaskAttemptListenerImpTezDag extends AbstractService implements - TaskAttemptListener, TaskCommunicatorContext { + TaskAttemptListener { private static final Logger LOG = LoggerFactory .getLogger(TaskAttemptListenerImpTezDag.class); @@ -123,7 +123,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } this.taskCommunicators = new TaskCommunicator[taskCommunicatorClassIdentifiers.length]; for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) { - taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i]); + taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i], i); } // TODO TEZ-2118 Start using taskCommunicator indices properly } @@ -144,13 +144,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } } - private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier) { + private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier, int taskCommIndex) { if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { LOG.info("Using Default Task Communicator"); - return new TezTaskCommunicatorImpl(this); + return new TezTaskCommunicatorImpl(new TaskCommunicatorContextImpl(context, this, taskCommIndex)); } else if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) { LOG.info("Using Default Local Task Communicator"); - return new TezLocalTaskCommunicatorImpl(this); + return new TezLocalTaskCommunicatorImpl(new TaskCommunicatorContextImpl(context, this, taskCommIndex)); } else { LOG.info("Using TaskCommunicator: " + taskCommClassIdentifier); Class taskCommClazz = (Class) ReflectionUtils @@ -158,7 +158,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements try { Constructor ctor = taskCommClazz.getConstructor(TaskCommunicatorContext.class); ctor.setAccessible(true); - return ctor.newInstance(this); + return ctor.newInstance(new TaskCommunicatorContextImpl(context, this, taskCommIndex)); } catch (NoSuchMethodException e) { throw new TezUncheckedException(e); } catch (InvocationTargetException e) { @@ -170,18 +170,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } } } - - @Override - public ApplicationAttemptId getApplicationAttemptId() { - return context.getApplicationAttemptId(); - } - - @Override - public Credentials getCredentials() { - return context.getAppCredentials(); - } - - @Override public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException { ContainerId containerId = ConverterUtils.toContainerId(request @@ -251,30 +239,20 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } return new TaskHeartbeatResponse(false, outEvents); } - - @Override - public boolean isKnownContainer(ContainerId containerId) { - return context.getAllContainers().get(containerId) != null; - } - - @Override public void taskAlive(TezTaskAttemptID taskAttemptId) { taskHeartbeatHandler.pinged(taskAttemptId); } - @Override public void containerAlive(ContainerId containerId) { pingContainerHeartbeatHandler(containerId); } - @Override public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) { context.getEventHandler() .handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null)); pingContainerHeartbeatHandler(containerId); } - @Override public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, String diagnostics) { // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler, @@ -287,7 +265,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements taskAttemptEndReason))); } - @Override public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, String diagnostics) { // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler, @@ -300,6 +277,11 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements taskAttemptEndReason))); } + public void vertexStateUpdateNotificationReceived(VertexStateUpdate event, int taskCommIndex) throws + Exception { + taskCommunicators[taskCommIndex].onVertexStateUpdated(event); + } + /** * Child checking whether it can commit. @@ -309,7 +291,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements * {@link Task#canCommit(TezTaskAttemptID)} This is * a legacy from the * centralized commit protocol handling by the JobTracker. */ - @Override +// @Override public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException { LOG.info("Commit go/no-go request from " + taskAttemptId.toString()); // An attempt is asking if it can commit its output. This can be decided http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java new file mode 100644 index 0000000..3714c3c --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java @@ -0,0 +1,124 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Set; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.dag.api.TaskAttemptEndReason; +import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.dag.api.TaskHeartbeatRequest; +import org.apache.tez.dag.api.TaskHeartbeatResponse; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.dag.app.dag.VertexStateUpdateListener; +import org.apache.tez.dag.records.TezTaskAttemptID; + +@InterfaceAudience.Private +public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, VertexStateUpdateListener { + + + private final AppContext context; + private final TaskAttemptListenerImpTezDag taskAttemptListener; + private final int taskCommunicatorIndex; + + public TaskCommunicatorContextImpl(AppContext appContext, + TaskAttemptListenerImpTezDag taskAttemptListener, + int taskCommunicatorIndex) { + this.context = appContext; + this.taskAttemptListener = taskAttemptListener; + this.taskCommunicatorIndex = taskCommunicatorIndex; + } + + @Override + public ApplicationAttemptId getApplicationAttemptId() { + return context.getApplicationAttemptId(); + } + + @Override + public Credentials getCredentials() { + return context.getAppCredentials(); + } + + @Override + public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException { + return taskAttemptListener.canCommit(taskAttemptId); + } + + @Override + public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, + TezException { + return taskAttemptListener.heartbeat(request); + } + + @Override + public boolean isKnownContainer(ContainerId containerId) { + return context.getAllContainers().get(containerId) != null; + } + + @Override + public void taskAlive(TezTaskAttemptID taskAttemptId) { + taskAttemptListener.taskAlive(taskAttemptId); + } + + @Override + public void containerAlive(ContainerId containerId) { + taskAttemptListener.containerAlive(containerId); + } + + @Override + public void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId) { + taskAttemptListener.taskStartedRemotely(taskAttemptId, containerId); + } + + @Override + public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, + @Nullable String diagnostics) { + taskAttemptListener.taskKilled(taskAttemptId, taskAttemptEndReason, diagnostics); + } + + @Override + public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, + @Nullable String diagnostics) { + taskAttemptListener.taskFailed(taskAttemptId, taskAttemptEndReason, diagnostics); + + } + + @Override + public void registerForVertexStateUpdates(String vertexName, + @Nullable Set stateSet) { + Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName); + context.getCurrentDAG().getStateChangeNotifier().registerForVertexUpdates(vertexName, stateSet, this); + } + + + @Override + public void onStateUpdated(VertexStateUpdate event) { + try { + taskAttemptListener.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex); + } catch (Exception e) { + // TODO TEZ-2003 This needs to be propagated to the DAG as a user error. + throw new TezUncheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index a4a707b..fa2749a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -48,6 +48,7 @@ import org.apache.tez.dag.api.TaskHeartbeatResponse; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.app.security.authorize.TezAMPolicyProvider; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.TaskSpec; @@ -254,6 +255,11 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { return address; } + @Override + public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception { + // Empty. Not registering, or expecting any updates. + } + protected String getTokenIdentifier() { return tokenIdentifier; } http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java index 4c3426a..6d6872b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java @@ -94,4 +94,6 @@ public interface DAG { Map getVertexNameIDMapping(); + StateChangeNotifier getStateChangeNotifier(); + } http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 3d44ba6..ef2df78 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -702,6 +702,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } @Override + public StateChangeNotifier getStateChangeNotifier() { + return entityUpdateTracker; + } + + @Override public TezCounters getAllCounters() { readLock.lock();