tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [05/50] [abbrv] tez git commit: TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates. (sseth)
Date Sat, 22 Aug 2015 07:26:04 GMT
TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f0c8845c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f0c8845c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f0c8845c

Branch: refs/heads/master
Commit: f0c8845cb687d6b0af22b5800ca1e8947a34fac9
Parents: 952980a
Author: Siddharth Seth <sseth@apache.org>
Authored: Thu Apr 9 13:33:48 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Aug 21 18:13:54 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |   1 +
 .../apache/tez/dag/api/TaskCommunicator.java    |  20 +++
 .../tez/dag/api/TaskCommunicatorContext.java    |  14 ++-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  52 +++-----
 .../dag/app/TaskCommunicatorContextImpl.java    | 124 +++++++++++++++++++
 .../tez/dag/app/TezTaskCommunicatorImpl.java    |   6 +
 .../java/org/apache/tez/dag/app/dag/DAG.java    |   2 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   5 +
 8 files changed, 188 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 9d6b220..ca5225e 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -15,5 +15,6 @@ ALL CHANGES:
   TEZ-2283. Fixes after rebase 04/07.
   TEZ-2284. Separate TaskReporter into an interface.
   TEZ-2285. Allow TaskCommunicators to indicate task/container liveness.
+  TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 945091e..a2cd858 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 
@@ -54,4 +55,23 @@ public abstract class TaskCommunicator extends AbstractService {
   public abstract InetSocketAddress getAddress();
 
   // TODO Eventually. Add methods here to support preemption of tasks.
+
+  /**
+   * Receive notifications on vertex state changes.
+   * <p/>
+   * State changes will be received based on the registration via {@link
+   * org.apache.tez.runtime.api.InputInitializerContext#registerForVertexStateUpdates(String,
+   * java.util.Set)}. Notifications will be received for all registered state changes, and
not just
+   * for the latest state update. They will be in order in which the state change occurred.
</p>
+   *
+   * Extensive processing should not be performed via this method call. Instead this should
just be
+   * used as a notification mechanism.
+   * <br>This method may be invoked concurrently with other invocations into the TaskCommunicator
and
+   * multi-threading/concurrency implications must be considered.
+   * @param stateUpdate an event indicating the name of the vertex, and it's updated state.
+   *                    Additional information may be available for specific events, Look
at the
+   *                    type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate}
+   * @throws Exception
+   */
+  public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index 0c3bac3..19caed9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -16,10 +16,12 @@ package org.apache.tez.dag.api;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
+import java.util.Set;
 
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 
@@ -48,7 +50,7 @@ public interface TaskCommunicatorContext {
   void containerAlive(ContainerId containerId);
 
   // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt*
-  void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId);
+  void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId);
 
   // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt*
   void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
@Nullable String diagnostics);
@@ -56,6 +58,16 @@ public interface TaskCommunicatorContext {
   // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt*
   void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
@Nullable String diagnostics);
 
+  /**
+   * Register to get notifications on updates to the specified vertex. Notifications will
be sent
+   * via {@link org.apache.tez.runtime.api.InputInitializer#onVertexStateUpdated(org.apache.tez.dag.api.event.VertexStateUpdate)}
</p>
+   *
+   * This method can only be invoked once. Duplicate invocations will result in an error.
+   *
+   * @param vertexName the vertex name for which notifications are required.
+   * @param stateSet   the set of states for which notifications are required. null implies
all
+   */
+  void registerForVertexStateUpdates(String vertexName, @Nullable Set<VertexState>
stateSet);
   // TODO TEZ-2003 API. Should a method exist for task succeeded.
 
   // TODO Eventually Add methods to report availability stats to the scheduler.

http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index a6994d2..386e4af 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -17,6 +17,7 @@
 
 package org.apache.tez.dag.app;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
@@ -26,6 +27,7 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -34,13 +36,13 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventType;
+import com.google.common.base.Preconditions;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtilsInternal;
@@ -66,14 +68,12 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.common.security.JobTokenSecretManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 @SuppressWarnings("unchecked")
 @InterfaceAudience.Private
 public class TaskAttemptListenerImpTezDag extends AbstractService implements
-    TaskAttemptListener, TaskCommunicatorContext {
+    TaskAttemptListener {
 
   private static final Logger LOG = LoggerFactory
       .getLogger(TaskAttemptListenerImpTezDag.class);
@@ -123,7 +123,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
     this.taskCommunicators = new TaskCommunicator[taskCommunicatorClassIdentifiers.length];
     for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) {
-      taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i]);
+      taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i],
i);
     }
     // TODO TEZ-2118 Start using taskCommunicator indices properly
   }
@@ -144,13 +144,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
   }
 
-  private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier) {
+  private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier, int taskCommIndex)
{
     if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT))
{
       LOG.info("Using Default Task Communicator");
-      return new TezTaskCommunicatorImpl(this);
+      return new TezTaskCommunicatorImpl(new TaskCommunicatorContextImpl(context, this, taskCommIndex));
     } else if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT))
{
       LOG.info("Using Default Local Task Communicator");
-      return new TezLocalTaskCommunicatorImpl(this);
+      return new TezLocalTaskCommunicatorImpl(new TaskCommunicatorContextImpl(context, this,
taskCommIndex));
     } else {
       LOG.info("Using TaskCommunicator: " + taskCommClassIdentifier);
       Class<? extends TaskCommunicator> taskCommClazz = (Class<? extends TaskCommunicator>)
ReflectionUtils
@@ -158,7 +158,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
       try {
         Constructor<? extends TaskCommunicator> ctor = taskCommClazz.getConstructor(TaskCommunicatorContext.class);
         ctor.setAccessible(true);
-        return ctor.newInstance(this);
+        return ctor.newInstance(new TaskCommunicatorContextImpl(context, this, taskCommIndex));
       } catch (NoSuchMethodException e) {
         throw new TezUncheckedException(e);
       } catch (InvocationTargetException e) {
@@ -170,18 +170,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
       }
     }
   }
-
-  @Override
-  public ApplicationAttemptId getApplicationAttemptId() {
-    return context.getApplicationAttemptId();
-  }
-
-  @Override
-  public Credentials getCredentials() {
-    return context.getAppCredentials();
-  }
-
-  @Override
   public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
       throws IOException, TezException {
     ContainerId containerId = ConverterUtils.toContainerId(request
@@ -251,30 +239,20 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
     return new TaskHeartbeatResponse(false, outEvents);
   }
-
-  @Override
-  public boolean isKnownContainer(ContainerId containerId) {
-    return context.getAllContainers().get(containerId) != null;
-  }
-
-  @Override
   public void taskAlive(TezTaskAttemptID taskAttemptId) {
     taskHeartbeatHandler.pinged(taskAttemptId);
   }
 
-  @Override
   public void containerAlive(ContainerId containerId) {
     pingContainerHeartbeatHandler(containerId);
   }
 
-  @Override
   public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId)
{
     context.getEventHandler()
         .handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null));
     pingContainerHeartbeatHandler(containerId);
   }
 
-  @Override
   public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
                          String diagnostics) {
     // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
@@ -287,7 +265,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
         taskAttemptEndReason)));
   }
 
-  @Override
   public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
                          String diagnostics) {
     // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
@@ -300,6 +277,11 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
         taskAttemptEndReason)));
   }
 
+  public void vertexStateUpdateNotificationReceived(VertexStateUpdate event, int taskCommIndex)
throws
+      Exception {
+    taskCommunicators[taskCommIndex].onVertexStateUpdated(event);
+  }
+
 
   /**
    * Child checking whether it can commit.
@@ -309,7 +291,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
    * {@link Task#canCommit(TezTaskAttemptID)} This is * a legacy from the
    * centralized commit protocol handling by the JobTracker.
    */
-  @Override
+//  @Override
   public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
     LOG.info("Commit go/no-go request from " + taskAttemptId.toString());
     // An attempt is asking if it can commit its output. This can be decided

http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
new file mode 100644
index 0000000..3714c3c
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.api.TaskHeartbeatRequest;
+import org.apache.tez.dag.api.TaskHeartbeatResponse;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+@InterfaceAudience.Private
+public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, VertexStateUpdateListener
{
+
+
+  private final AppContext context;
+  private final TaskAttemptListenerImpTezDag taskAttemptListener;
+  private final int taskCommunicatorIndex;
+
+  public TaskCommunicatorContextImpl(AppContext appContext,
+                                     TaskAttemptListenerImpTezDag taskAttemptListener,
+                                     int taskCommunicatorIndex) {
+    this.context = appContext;
+    this.taskAttemptListener = taskAttemptListener;
+    this.taskCommunicatorIndex = taskCommunicatorIndex;
+  }
+
+  @Override
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return context.getApplicationAttemptId();
+  }
+
+  @Override
+  public Credentials getCredentials() {
+    return context.getAppCredentials();
+  }
+
+  @Override
+  public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
+    return taskAttemptListener.canCommit(taskAttemptId);
+  }
+
+  @Override
+  public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException,
+      TezException {
+    return taskAttemptListener.heartbeat(request);
+  }
+
+  @Override
+  public boolean isKnownContainer(ContainerId containerId) {
+    return context.getAllContainers().get(containerId) != null;
+  }
+
+  @Override
+  public void taskAlive(TezTaskAttemptID taskAttemptId) {
+    taskAttemptListener.taskAlive(taskAttemptId);
+  }
+
+  @Override
+  public void containerAlive(ContainerId containerId) {
+    taskAttemptListener.containerAlive(containerId);
+  }
+
+  @Override
+  public void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId)
{
+    taskAttemptListener.taskStartedRemotely(taskAttemptId, containerId);
+  }
+
+  @Override
+  public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+                         @Nullable String diagnostics) {
+    taskAttemptListener.taskKilled(taskAttemptId, taskAttemptEndReason, diagnostics);
+  }
+
+  @Override
+  public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+                         @Nullable String diagnostics) {
+    taskAttemptListener.taskFailed(taskAttemptId, taskAttemptEndReason, diagnostics);
+
+  }
+
+  @Override
+  public void registerForVertexStateUpdates(String vertexName,
+                                            @Nullable Set<VertexState> stateSet) {
+    Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName);
+    context.getCurrentDAG().getStateChangeNotifier().registerForVertexUpdates(vertexName,
stateSet, this);
+  }
+
+
+  @Override
+  public void onStateUpdated(VertexStateUpdate event) {
+    try {
+      taskAttemptListener.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex);
+    } catch (Exception e) {
+      // TODO TEZ-2003 This needs to be propagated to the DAG as a user error.
+      throw new TezUncheckedException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index a4a707b..fa2749a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -48,6 +48,7 @@ import org.apache.tez.dag.api.TaskHeartbeatResponse;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.app.security.authorize.TezAMPolicyProvider;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -254,6 +255,11 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
     return address;
   }
 
+  @Override
+  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
+    // Empty. Not registering, or expecting any updates.
+  }
+
   protected String getTokenIdentifier() {
     return tokenIdentifier;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 4c3426a..6d6872b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -94,4 +94,6 @@ public interface DAG {
 
   Map<String, TezVertexID> getVertexNameIDMapping();
 
+  StateChangeNotifier getStateChangeNotifier();
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f0c8845c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 3d44ba6..ef2df78 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -702,6 +702,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
 
   @Override
+  public StateChangeNotifier getStateChangeNotifier() {
+    return entityUpdateTracker;
+  }
+
+  @Override
   public TezCounters getAllCounters() {
 
     readLock.lock();


Mime
View raw message