tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-2212. Notify components on DAG completion. (sseth)
Date Tue, 21 Apr 2015 04:05:26 GMT
Repository: tez
Updated Branches:
  refs/heads/master db0a50c5e -> 44046f8a4


TEZ-2212. Notify components on DAG completion. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/44046f8a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/44046f8a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/44046f8a

Branch: refs/heads/master
Commit: 44046f8a4ae7edab466f6c3789772c97955f5786
Parents: db0a50c
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon Apr 20 21:05:04 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon Apr 20 21:05:04 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/common/AsyncDispatcher.java  |  5 +++
 .../org/apache/tez/dag/records/TezDAGID.java    |  7 ++++
 .../tez/dag/records/TezTaskAttemptID.java       |  6 +++
 .../org/apache/tez/dag/records/TezTaskID.java   |  6 +++
 .../org/apache/tez/dag/records/TezVertexID.java |  6 +++
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 39 ++++++++++++++++++++
 .../apache/tez/dag/app/TaskAttemptListener.java |  5 +++
 .../dag/app/TaskAttemptListenerImpTezDag.java   | 16 ++++++++
 .../dag/event/DAGAppMasterEventDagCleanup.java  | 31 ++++++++++++++++
 .../app/dag/event/DAGAppMasterEventType.java    |  4 +-
 .../tez/dag/app/launcher/ContainerLauncher.java |  5 +++
 .../dag/app/launcher/ContainerLauncherImpl.java | 12 ++++++
 .../app/launcher/LocalContainerLauncher.java    |  9 +++++
 .../dag/app/rm/LocalTaskSchedulerService.java   |  2 +-
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  7 +++-
 .../tez/dag/app/rm/TaskSchedulerService.java    |  2 +-
 .../dag/app/rm/YarnTaskSchedulerService.java    |  2 +-
 .../app/rm/container/AMContainerHelpers.java    |  5 +++
 .../dag/app/rm/container/AMContainerMap.java    |  6 +++
 .../tez/dag/app/rm/node/AMNodeTracker.java      |  6 +++
 .../apache/tez/dag/app/MockDAGAppMaster.java    | 11 +++++-
 22 files changed, 187 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c44824c..64b3561 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2212. Notify components on DAG completion.
   TEZ-2328. Add tez.runtime.sorter.class & rename tez.runtime.sort.threads
   to tez.runtime.pipelined.sorter.sort.threads.
   TEZ-2333. Enable local fetch optimization by default.

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
index b751260..5aaa4cf 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -326,4 +326,9 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher
{
       }
     };
   }
+
+  @Private
+  public int getQueueSize() {
+    return eventQueue.size();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
index d12f760..0fe1d44 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
@@ -23,6 +23,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.text.NumberFormat;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
 import com.google.common.base.Preconditions;
@@ -66,6 +67,12 @@ public class TezDAGID extends TezID {
     Preconditions.checkArgument(applicationId != null, "ApplicationID cannot be null");
     return dagIdCache.getUnchecked(new TezDAGID(applicationId, id));
   }
+
+  @InterfaceAudience.Private
+  public static void clearCache() {
+    dagIdCache.invalidateAll();
+    dagIdCache.cleanUp();
+  }
   
   /**
    * Get a DAGID object from given parts.

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
index 0896b99..296d577 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
@@ -74,6 +74,12 @@ public class TezTaskAttemptID extends TezID {
     return taskAttemptIDCache.getUnchecked(new TezTaskAttemptID(taskID, id));
   }
 
+  @InterfaceAudience.Private
+  public static void clearCache() {
+    taskAttemptIDCache.invalidateAll();
+    taskAttemptIDCache.cleanUp();
+  }
+
   private TezTaskAttemptID(TezTaskID taskId, int id) {
     super(id);
     if(taskId == null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
index 0a7bc4a..b4c7b32 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
@@ -81,6 +81,12 @@ public class TezTaskID extends TezID {
     return taskIDCache.getUnchecked(new TezTaskID(vertexID, id));
   }
 
+  @InterfaceAudience.Private
+  public static void clearCache() {
+    taskIDCache.invalidateAll();
+    taskIDCache.cleanUp();
+  }
+
   private TezTaskID(TezVertexID vertexID, int id) {
     super(id);
     Preconditions.checkArgument(vertexID != null, "vertexID cannot be null");

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
index a9b2625..93ef551 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
@@ -83,6 +83,12 @@ public class TezVertexID extends TezID {
     return vertexIDCache.getUnchecked(new TezVertexID(dagId, id));
   }
 
+  @InterfaceAudience.Private
+  public static void clearCache() {
+    vertexIDCache.invalidateAll();
+    vertexIDCache.cleanUp();
+  }
+
   private TezVertexID(TezDAGID dagId, int id) {
     super(id);
     this.dagId = dagId;

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 2924b25..3dd9e4c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -57,6 +57,9 @@ import java.util.regex.Pattern;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Options;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDagCleanup;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -661,7 +664,19 @@ public class DAGAppMaster extends AbstractService {
         }
         if (!state.equals(DAGAppMasterState.ERROR)) {
           if (!sessionStopped.get()) {
+            LOG.info("Central Dispatcher queue size after DAG completion, before cleanup:
" +
+                dispatcher.getQueueSize());
             LOG.info("Waiting for next DAG to be submitted.");
+
+            // Sending this via the event queue, in case there are pending events which need
to be
+            // processed. TaskKilled for example, or ContainerCompletions.
+            // The DAG needs to be part of the event, since the dag can get reset when the
next
+            // dag is submitted. The next DAG, however, will not start executing till the
cleanup
+            // is complete, since execution start is on the same dispatcher.
+            sendEvent(new DAGAppMasterEventDagCleanup(context.getCurrentDAG()));
+
+            // Leaving the taskSchedulerEventHandler here for now. Doesn't generate new events.
+            // However, eventually it needs to be moved out.
             this.taskSchedulerEventHandler.dagCompleted();
             state = DAGAppMasterState.IDLE;
           } else {
@@ -689,6 +704,27 @@ public class DAGAppMaster extends AbstractService {
       this.state = DAGAppMasterState.KILLED;
       shutdownHandler.shutdown(true);
       break;
+    case DAG_CLEANUP:
+      DAGAppMasterEventDagCleanup cleanupEvent = (DAGAppMasterEventDagCleanup) event;
+      LOG.info("Cleaning up DAG: name=" + cleanupEvent.getDag().getName() + ", with id="
+
+          cleanupEvent.getDag().getID());
+      containerLauncher.dagComplete(cleanupEvent.getDag());
+      taskAttemptListener.dagComplete(cleanupEvent.getDag());
+      nodes.dagComplete(cleanupEvent.getDag());
+      containers.dagComplete(cleanupEvent.getDag());
+      TezTaskAttemptID.clearCache();
+      TezTaskID.clearCache();
+      TezVertexID.clearCache();
+      TezDAGID.clearCache();
+      LOG.info("Completed cleanup for DAG: name=" + cleanupEvent.getDag().getName() + ",
with id=" +
+          cleanupEvent.getDag().getID());
+      break;
+    case NEW_DAG_SUBMITTED:
+      // Inform sub-components that a new DAG has been submitted.
+      taskSchedulerEventHandler.dagSubmitted();
+      containerLauncher.dagSubmitted();
+      taskAttemptListener.dagSubmitted();
+      break;
     default:
       throw new TezUncheckedException(
           "AppMaster: No handler for event type: " + event.getType());
@@ -2081,6 +2117,9 @@ public class DAGAppMaster extends AbstractService {
     // End of creating the job.
     ((RunningAppContext) context).setDAG(currentDAG);
 
+    // Send out an event to inform components that a new DAG has been submitted.
+    // Information about this DAG is available via the context.
+    sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.NEW_DAG_SUBMITTED));
     // create a job event for job initialization
     DAGEvent initDagEvent = new DAGEvent(currentDAG.getID(), DAGEventType.DAG_INIT);
     // Send init to the job (this does NOT trigger job execution)

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
index aeb0cd5..9caa7cf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app;
 import java.net.InetSocketAddress;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.rm.container.AMContainerTask;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 /**
@@ -38,4 +39,8 @@ public interface TaskAttemptListener {
   
   void unregisterTaskAttempt(TezTaskAttemptID attemptID);
 
+  void dagComplete(DAG dag);
+
+  void dagSubmitted();
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index a01ca39..b64283b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -282,6 +282,22 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   }
 
   @Override
+  public void dagComplete(DAG dag) {
+    // TODO TEZ-2335. Cleanup TaskHeartbeat handler structures.
+    // TODO TEZ-2345. Also cleanup attemptInfo map, so that any tasks which heartbeat are
told to die.
+    // Container structures remain unchanged - since they could be re-used across restarts.
+    // This becomes more relevant when task kills without container kills are allowed.
+
+    // TODO TEZ-2336. Send a signal to containers indicating DAG completion.
+  }
+
+  @Override
+  public void dagSubmitted() {
+    // Nothing to do right now. Indicates that a new DAG has been submitted and
+    // the context has updated information.
+  }
+
+  @Override
   public void registerRunningContainer(ContainerId containerId) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("ContainerId: " + containerId

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventDagCleanup.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventDagCleanup.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventDagCleanup.java
new file mode 100644
index 0000000..2d1a23a
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventDagCleanup.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.DAG;
+
+public class DAGAppMasterEventDagCleanup extends DAGAppMasterEvent {
+
+  private final DAG dag;
+
+  public DAGAppMasterEventDagCleanup(DAG dag) {
+    super(DAGAppMasterEventType.DAG_CLEANUP);
+    this.dag = dag;
+  }
+
+  public DAG getDag() {
+    return this.dag;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
index ee03ef5..5a102a5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
@@ -22,5 +22,7 @@ public enum DAGAppMasterEventType {
   INTERNAL_ERROR,
   AM_REBOOT,
   DAG_FINISHED,
-  SCHEDULING_SERVICE_ERROR
+  SCHEDULING_SERVICE_ERROR,
+  NEW_DAG_SUBMITTED, // Indicates a new dag being submitted, to notify sub-components
+  DAG_CLEANUP
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
index 35cdeda..305eb50 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
@@ -20,8 +20,13 @@ package org.apache.tez.dag.app.launcher;
 
 
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
 
 public interface ContainerLauncher 
     extends EventHandler<NMCommunicatorEvent> {
+
+    void dagComplete(DAG dag);
+
+    void dagSubmitted();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index a1eb2a7..94889a1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.dag.app.dag.DAG;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -110,6 +111,17 @@ public class ContainerLauncherImpl extends AbstractService implements
     }
   }
 
+  @Override
+  public void dagComplete(DAG dag) {
+    // Nothing required at the moment. Containers are shared across DAGs
+  }
+
+  @Override
+  public void dagSubmitted() {
+    // Nothing to do right now. Indicates that a new DAG has been submitted and
+    // the context has updated information.
+  }
+
   private static enum ContainerState {
     PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 5c8aab6..9faf8c0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -43,6 +43,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import org.apache.tez.dag.app.dag.DAG;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -154,6 +155,14 @@ public class LocalContainerLauncher extends AbstractService implements
     callbackExecutor.shutdownNow();
   }
 
+  @Override
+  public void dagComplete(DAG dag) {
+  }
+
+  @Override
+  public void dagSubmitted() {
+  }
+
   // Thread to monitor the queue of incoming NMCommunicator events
   private class TezSubTaskRunner implements Runnable {
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index ce01bfb..51d8b9d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -112,7 +112,7 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
   }
 
   @Override
-  public void resetMatchLocalityForAllHeldContainers() {
+  public void dagComplete() {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 250446d..19db660 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -563,7 +563,12 @@ public class TaskSchedulerEventHandler extends AbstractService
   }
 
   public void dagCompleted() {
-    taskScheduler.resetMatchLocalityForAllHeldContainers();
+    taskScheduler.dagComplete();
+  }
+
+  public void dagSubmitted() {
+    // Nothing to do right now. Indicates that a new DAG has been submitted and
+    // the context has updated information.
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
index 096069b..48d5455 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
@@ -43,7 +43,7 @@ public abstract class TaskSchedulerService extends AbstractService{
 
   public abstract int getClusterNodeCount();
 
-  public abstract void resetMatchLocalityForAllHeldContainers();
+  public abstract void dagComplete();
 
   public abstract Resource getTotalResources();
 

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 66a6f33..44f5484 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -838,7 +838,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   }
 
   @Override
-  public synchronized void resetMatchLocalityForAllHeldContainers() {
+  public synchronized void dagComplete() {
     for (HeldContainer heldContainer : heldContainers.values()) {
       heldContainer.resetLocalityMatchLevel();
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 41e814b..470fa56 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -65,6 +65,11 @@ public class AMContainerHelpers {
   private static Map<TezDAGID, ContainerLaunchContext> commonContainerSpecs =
       new HashMap<TezDAGID, ContainerLaunchContext>();
 
+  public static void dagComplete(TezDAGID dagId) {
+    synchronized (commonContainerSpecLock) {
+      commonContainerSpecs.remove(dagId);
+    }
+  }
 
   /**
    * Create a {@link LocalResource} record with all the given parameters.

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
index 6037a3a..574c38e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app.rm.container;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.tez.dag.app.dag.DAG;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.service.AbstractService;
@@ -74,4 +75,9 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo
   public Collection<AMContainer> values() {
     return containerMap.values();
   }
+
+  public void dagComplete(DAG dag){
+    AMContainerHelpers.dagComplete(dag.getID());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index a0be825..102cbe9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.tez.dag.app.dag.DAG;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -205,4 +206,9 @@ public class AMNodeTracker extends AbstractService implements
   public boolean isBlacklistingIgnored() {
     return this.ignoreBlacklisting;
   }
+
+  public void dagComplete(DAG dag) {
+    // TODO TEZ-2337 Maybe reset failures from previous DAGs
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index fca15fd..92dfdb5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.tez.dag.app.dag.DAG;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -127,7 +128,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
     AtomicBoolean startScheduling = new AtomicBoolean(true);
     AtomicBoolean goFlag;
     boolean updateProgress = true;
-    
+
     LinkedBlockingQueue<ContainerData> containersToProcess = new LinkedBlockingQueue<ContainerData>();
     
     Map<TezTaskID, Integer> preemptedTasks = Maps.newConcurrentMap();
@@ -139,6 +140,14 @@ public class MockDAGAppMaster extends DAGAppMaster {
       this.goFlag = goFlag;
     }
 
+    @Override
+    public void dagComplete(DAG dag) {
+    }
+
+    @Override
+    public void dagSubmitted() {
+    }
+
     public class ContainerData {
       ContainerId cId;
       TezTaskAttemptID taId;


Mime
View raw message