tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [15/26] tez git commit: TEZ-2123. Fix component managers to use pluggable components. Enable hybrid mode. (sseth)
Date Thu, 09 Apr 2015 20:33:11 GMT
TEZ-2123. Fix component managers to use pluggable components. Enable
hybrid mode. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5c2818e0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5c2818e0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5c2818e0

Branch: refs/heads/TEZ-2003
Commit: 5c2818e0b8ddd2a5b070c1f84d029f75e3527c5b
Parents: a2006c9
Author: Siddharth Seth <sseth@apache.org>
Authored: Fri Feb 20 11:59:03 2015 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Thu Apr 9 13:32:31 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |   1 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   4 +-
 .../apache/tez/dag/app/TaskAttemptListener.java |  12 +-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  31 ++--
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |   4 +-
 .../TezRootInputInitializerContextImpl.java     |   2 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   2 +-
 .../tez/dag/app/dag/impl/VertexManager.java     |   2 +-
 .../app/launcher/ContainerLauncherRouter.java   |   2 +-
 .../app/launcher/LocalContainerLauncher.java    |  10 +-
 .../rm/AMSchedulerEventDeallocateContainer.java |   7 +-
 .../rm/AMSchedulerEventNodeBlacklistUpdate.java |   8 +-
 .../tez/dag/app/rm/AMSchedulerEventTAEnded.java |  10 +-
 .../dag/app/rm/LocalTaskSchedulerService.java   |  19 ++-
 .../tez/dag/app/rm/NMCommunicatorEvent.java     |  12 +-
 .../rm/NMCommunicatorLaunchRequestEvent.java    |  11 +-
 .../app/rm/NMCommunicatorStopRequestEvent.java  |   4 +-
 .../dag/app/rm/TaskSchedulerEventHandler.java   | 151 ++++++++++++-----
 .../tez/dag/app/rm/container/AMContainer.java   |   3 +
 .../AMContainerEventLaunchRequest.java          |  15 +-
 .../dag/app/rm/container/AMContainerImpl.java   |  39 +++--
 .../dag/app/rm/container/AMContainerMap.java    |   4 +-
 .../apache/tez/dag/app/rm/node/AMNodeImpl.java  |   6 +-
 .../apache/tez/dag/app/MockDAGAppMaster.java    |   2 +-
 .../app/TestTaskAttemptListenerImplTezDag.java  |  22 +--
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |  69 ++++----
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |   8 +-
 .../tez/dag/app/rm/TestContainerReuse.java      |  34 ++--
 .../tez/dag/app/rm/TestLocalTaskScheduler.java  |   2 +-
 .../app/rm/TestLocalTaskSchedulerService.java   |  18 ++-
 .../app/rm/TestTaskSchedulerEventHandler.java   |  11 +-
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |   2 +-
 .../dag/app/rm/container/TestAMContainer.java   | 108 +++++++------
 .../app/rm/container/TestAMContainerMap.java    |   6 +-
 .../org/apache/tez/examples/JoinValidate.java   |  30 +++-
 .../TezTestServiceContainerLauncher.java        |   5 +-
 .../rm/TezTestServiceTaskSchedulerService.java  | 100 ++----------
 .../tez/examples/JoinValidateConfigured.java    |  53 ++++++
 .../tez/tests/TestExternalTezServices.java      | 160 ++++++++++++++-----
 39 files changed, 630 insertions(+), 359 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 4bfe08f..1a2264c 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -4,5 +4,6 @@ ALL CHANGES:
   TEZ-2090. Add tests for jobs running in external services.
   TEZ-2117. Add a manager for ContainerLaunchers running in the AM.
   TEZ-2122. Setup pluggable components at AM/Vertex level.
+  TEZ-2123. Fix component managers to use pluggable components. (Enable hybrid mode)
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index cce945a..04329b2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -496,7 +496,7 @@ public class DAGAppMaster extends AbstractService {
 
     this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
         clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService,
-        taskSchedulerClassIdentifiers);
+        taskSchedulerClassIdentifiers, isLocal);
     addIfService(taskSchedulerEventHandler, true);
 
     if (enableWebUIService()) {
@@ -2188,6 +2188,7 @@ public class DAGAppMaster extends AbstractService {
   // Tez default classnames are populated as TezConfiguration.TEZ_AM_SERVICE_PLUGINS_DEFAULT
   private String[] parsePlugins(BiMap<String, Integer> pluginMap, String[] pluginStrings,
                                    String context) {
+    // TODO TEZ-2003 Duplicate error checking - ideally in the client itself. Depends on the final API.
     Preconditions.checkState(pluginStrings != null && pluginStrings.length > 0,
         "Plugin strings should not be null or empty: " + context);
 
@@ -2225,6 +2226,7 @@ public class DAGAppMaster extends AbstractService {
       }
       pluginMap.put(identifierString, index);
       classNames[index] = className;
+      index++;
     }
     return classNames;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
index aeb0cd5..51f3ff7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app;
 import java.net.InetSocketAddress;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.app.rm.container.AMContainerTask;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 /**
@@ -28,14 +29,13 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
  */
 public interface TaskAttemptListener {
 
-  InetSocketAddress getAddress();
+  void registerRunningContainer(ContainerId containerId, int taskCommId);
 
-  void registerRunningContainer(ContainerId containerId);
-
-  void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId);
+  void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId);
   
-  void unregisterRunningContainer(ContainerId containerId);
+  void unregisterRunningContainer(ContainerId containerId, int taskCommId);
   
-  void unregisterTaskAttempt(TezTaskAttemptID attemptID);
+  void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId);
 
+  TaskCommunicator getTaskCommunicator(int taskCommIndex);
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index a465447..d8d186a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -251,15 +251,10 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     return task.canCommit(taskAttemptId);
   }
 
-  @Override
-  public InetSocketAddress getAddress() {
-    return taskCommunicators[0].getAddress();
-  }
-
   // The TaskAttemptListener register / unregister methods in this class are not thread safe.
   // The Tez framework should not invoke these methods from multiple threads.
   @Override
-  public void registerRunningContainer(ContainerId containerId) {
+  public void registerRunningContainer(ContainerId containerId, int taskCommId) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("ContainerId: " + containerId + " registered with TaskAttemptListener");
     }
@@ -269,11 +264,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
           "Multiple registrations for containerId: " + containerId);
     }
     NodeId nodeId = context.getAllContainers().get(containerId).getContainer().getNodeId();
-    taskCommunicators[0].registerRunningContainer(containerId, nodeId.getHost(), nodeId.getPort());
+    taskCommunicators[taskCommId].registerRunningContainer(containerId, nodeId.getHost(),
+        nodeId.getPort());
   }
 
   @Override
-  public void unregisterRunningContainer(ContainerId containerId) {
+  public void unregisterRunningContainer(ContainerId containerId, int taskCommId) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId);
     }
@@ -281,12 +277,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     if (containerInfo.taskAttemptId != null) {
       registeredAttempts.remove(containerInfo.taskAttemptId);
     }
-    taskCommunicators[0].registerContainerEnd(containerId);
+    taskCommunicators[taskCommId].registerContainerEnd(containerId);
   }
 
   @Override
   public void registerTaskAttempt(AMContainerTask amContainerTask,
-                                  ContainerId containerId) {
+                                  ContainerId containerId, int taskCommId) {
     ContainerInfo containerInfo = registeredContainers.get(containerId);
     if (containerInfo == null) {
       throw new TezUncheckedException("Registering task attempt: "
@@ -316,13 +312,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
           + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
           + " when already assigned to: " + containerIdFromMap);
     }
-    taskCommunicators[0].registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
+    taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
         amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(),
         amContainerTask.haveCredentialsChanged());
   }
 
   @Override
-  public void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
+  public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId) {
     ContainerId containerId = registeredAttempts.remove(attemptId);
     if (containerId == null) {
       LOG.warn("Unregister task attempt: " + attemptId + " from unknown container");
@@ -336,7 +332,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
     // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
     registeredContainers.put(containerId, NULL_CONTAINER_INFO);
-    taskCommunicators[0].unregisterRunningTaskAttempt(attemptId);
+    taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId);
+  }
+
+  @Override
+  public TaskCommunicator getTaskCommunicator(int taskCommIndex) {
+    return taskCommunicators[taskCommIndex];
   }
 
   private void pingContainerHeartbeatHandler(ContainerId containerId) {
@@ -352,8 +353,4 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
           + ", ContainerId not known for this attempt");
     }
   }
-
-  public TaskCommunicator getTaskCommunicator() {
-    return taskCommunicators[0];
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index e4705ea..e1851c4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1207,7 +1207,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       // Inform the scheduler
       if (sendSchedulerEvent()) {
         ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper
-            .getTaskAttemptState()));
+            .getTaskAttemptState(), ta.getVertex().getTaskSchedulerIdentifier()));
       }
     }
   }
@@ -1288,7 +1288,7 @@ public class TaskAttemptImpl implements TaskAttempt,
 
       // Inform the Scheduler.
       ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId,
-          TaskAttemptState.SUCCEEDED));
+          TaskAttemptState.SUCCEEDED, ta.getVertex().getTaskSchedulerIdentifier()));
 
       // Inform the task.
       ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
index d4ef4d5..4ca4024 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
@@ -96,7 +96,7 @@ public class TezRootInputInitializerContextImpl implements
 
   @Override
   public Resource getTotalAvailableResource() {
-    return appContext.getTaskScheduler().getTotalResources();
+    return appContext.getTaskScheduler().getTotalResources(vertex.getTaskSchedulerIdentifier());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 5ea98d0..a866203 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -4043,7 +4043,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         eventHandler, getTotalTasks(),
         appContext.getTaskScheduler().getNumClusterNodes(),
         getTaskResource(),
-        appContext.getTaskScheduler().getTotalResources());
+        appContext.getTaskScheduler().getTotalResources(taskSchedulerIdentifier));
     List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
         inputList = Lists.newArrayListWithCapacity(inputsWithInitializers.size());
     for (String inputName : inputsWithInitializers) {

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 4bf51a1..311d3d9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -257,7 +257,7 @@ public class VertexManager {
     @Override
     public synchronized Resource getTotalAvailableResource() {
       checkAndThrowIfDone();
-      return appContext.getTaskScheduler().getTotalResources();
+      return appContext.getTaskScheduler().getTotalResources(managedVertex.getTaskSchedulerIdentifier());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index 621e4a8..4f9b5bf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -124,6 +124,6 @@ public class ContainerLauncherRouter extends AbstractService
 
   @Override
   public void handle(NMCommunicatorEvent event) {
-    containerLaunchers[0].handle(event);
+    containerLaunchers[event.getLauncherId()].handle(event);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index b019875..6db48cb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -58,7 +58,6 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
-import org.apache.tez.dag.app.TaskAttemptListenerImpTezDag;
 import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
 import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
 import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
@@ -87,9 +86,9 @@ public class LocalContainerLauncher extends AbstractService implements
 
   private static final Logger LOG = LoggerFactory.getLogger(LocalContainerLauncher.class);
   private final AppContext context;
-  private final TezTaskUmbilicalProtocol taskUmbilicalProtocol;
   private final AtomicBoolean serviceStopped = new AtomicBoolean(false);
   private final String workingDirectory;
+  private final TaskAttemptListener tal;
   private final Map<String, String> localEnv = new HashMap<String, String>();
   private final ExecutionContext executionContext;
 
@@ -114,9 +113,8 @@ public class LocalContainerLauncher extends AbstractService implements
                                 String workingDirectory) throws UnknownHostException {
     super(LocalContainerLauncher.class.getName());
     this.context = context;
-    TaskAttemptListenerImpTezDag taListener = (TaskAttemptListenerImpTezDag)taskAttemptListener;
-    TezTaskCommunicatorImpl taskComm = (TezTaskCommunicatorImpl) taListener.getTaskCommunicator();
-    this.taskUmbilicalProtocol = taskComm.getUmbilical();
+    this.tal = taskAttemptListener;
+
     this.workingDirectory = workingDirectory;
     AuxiliaryServiceHelper.setServiceDataIntoEnv(
         ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(0), localEnv);
@@ -209,7 +207,7 @@ public class LocalContainerLauncher extends AbstractService implements
         tezChild =
             createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier,
                 context.getApplicationAttemptId().getAttemptId(), context.getLocalDirs(),
-                taskUmbilicalProtocol,
+                ((TezTaskCommunicatorImpl)tal.getTaskCommunicator(event.getTaskCommId())).getUmbilical(),
                 TezCommonUtils.parseCredentialsBytes(event.getContainerLaunchContext().getTokens().array()));
       } catch (InterruptedException e) {
         handleLaunchFailed(e, event.getContainerId());

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
index 1b51920..5270aa2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
@@ -23,15 +23,20 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 public class AMSchedulerEventDeallocateContainer extends AMSchedulerEvent {
 
   private final ContainerId containerId;
+  private final int schedulerId;
   
-  public AMSchedulerEventDeallocateContainer(ContainerId containerId) {
+  public AMSchedulerEventDeallocateContainer(ContainerId containerId, int schedulerId) {
     super(AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
     this.containerId = containerId;
+    this.schedulerId = schedulerId;
   }
   
   public ContainerId getContainerId() {
     return this.containerId;
   }
 
+  public int getSchedulerId() {
+    return schedulerId;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
index ed7ebc3..679705a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
@@ -23,14 +23,20 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 public class AMSchedulerEventNodeBlacklistUpdate extends AMSchedulerEvent {
 
   private final NodeId nodeId;
+  private final int schedulerId;
 
-  public AMSchedulerEventNodeBlacklistUpdate(NodeId nodeId, boolean add) {
+  public AMSchedulerEventNodeBlacklistUpdate(NodeId nodeId, boolean add, int schedulerId) {
     super((add ? AMSchedulerEventType.S_NODE_BLACKLISTED
         : AMSchedulerEventType.S_NODE_UNBLACKLISTED));
     this.nodeId = nodeId;
+    this.schedulerId = schedulerId;
   }
 
   public NodeId getNodeId() {
     return this.nodeId;
   }
+
+  public int getSchedulerId() {
+    return schedulerId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
index 90e76b7..2ace642 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
@@ -26,14 +26,16 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
 
   private final TaskAttempt attempt;
   private final ContainerId containerId;
-  private TaskAttemptState state;
+  private final TaskAttemptState state;
+  private final int schedulerId;
 
   public AMSchedulerEventTAEnded(TaskAttempt attempt, ContainerId containerId,
-      TaskAttemptState state) {
+      TaskAttemptState state, int schedulerId) {
     super(AMSchedulerEventType.S_TA_ENDED);
     this.attempt = attempt;
     this.containerId = containerId;
     this.state = state;
+    this.schedulerId = schedulerId;
   }
 
   public TezTaskAttemptID getAttemptID() {
@@ -51,4 +53,8 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
   public ContainerId getUsedContainerId() {
     return this.containerId;
   }
+
+  public int getSchedulerId() {
+    return schedulerId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index ce01bfb..ecb7ad7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -63,10 +64,11 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
   final int appHostPort;
   final String appTrackingUrl;
   final AppContext appContext;
+  final long customContainerAppId;
 
   public LocalTaskSchedulerService(TaskSchedulerAppCallback appClient,
       ContainerSignatureMatcher containerSignatureMatcher, String appHostName,
-      int appHostPort, String appTrackingUrl, AppContext appContext) {
+      int appHostPort, String appTrackingUrl, long customContainerAppId, AppContext appContext) {
     super(LocalTaskSchedulerService.class.getName());
     this.realAppClient = appClient;
     this.appCallbackExecutor = createAppCallbackExecutorService();
@@ -78,6 +80,7 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
     this.appContext = appContext;
     taskRequestQueue = new PriorityBlockingQueue<TaskRequest>();
     taskAllocations = new LinkedHashMap<Object, Container>();
+    this.customContainerAppId = customContainerAppId;
   }
 
   private ExecutorService createAppCallbackExecutorService() {
@@ -164,7 +167,7 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
 
   protected AsyncDelegateRequestHandler createRequestHandler(Configuration conf) {
     return new AsyncDelegateRequestHandler(taskRequestQueue,
-        new LocalContainerFactory(appContext),
+        new LocalContainerFactory(appContext, customContainerAppId),
         taskAllocations,
         appClientDelegate,
         conf);
@@ -195,17 +198,19 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
   }
 
   static class LocalContainerFactory {
-    final AppContext appContext;
     AtomicInteger nextId;
+    final ApplicationAttemptId customAppAttemptId;
 
-    public LocalContainerFactory(AppContext appContext) {
-      this.appContext = appContext;
+    public LocalContainerFactory(AppContext appContext, long appIdLong) {
       this.nextId = new AtomicInteger(1);
+      ApplicationId appId = ApplicationId
+          .newInstance(appIdLong, appContext.getApplicationAttemptId().getApplicationId().getId());
+      this.customAppAttemptId = ApplicationAttemptId
+          .newInstance(appId, appContext.getApplicationAttemptId().getAttemptId());
     }
 
     public Container createContainer(Resource capability, Priority priority) {
-      ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId();
-      ContainerId containerId = ContainerId.newInstance(appAttemptId, nextId.getAndIncrement());
+      ContainerId containerId = ContainerId.newInstance(customAppAttemptId, nextId.getAndIncrement());
       NodeId nodeId = NodeId.newInstance("127.0.0.1", 0);
       String nodeHttpAddress = "127.0.0.1:0";
 

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
index 8bdeb28..f86894f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
@@ -28,13 +28,15 @@ public class NMCommunicatorEvent extends AbstractEvent<NMCommunicatorEventType>
   private final ContainerId containerId;
   private final NodeId nodeId;
   private final Token containerToken;
+  private final int launcherId;
 
   public NMCommunicatorEvent(ContainerId containerId, NodeId nodeId,
-      Token containerToken, NMCommunicatorEventType type) {
+      Token containerToken, NMCommunicatorEventType type, int launcherId) {
     super(type);
     this.containerId = containerId;
     this.nodeId = nodeId;
     this.containerToken = containerToken;
+    this.launcherId = launcherId;
   }
 
   public ContainerId getContainerId() {
@@ -48,10 +50,14 @@ public class NMCommunicatorEvent extends AbstractEvent<NMCommunicatorEventType>
   public Token getContainerToken() {
     return this.containerToken;
   }
-  
+
+  public int getLauncherId() {
+    return launcherId;
+  }
+
   public String toSrting() {
     return super.toString() + " for container " + containerId + ", nodeId: "
-        + nodeId;
+        + nodeId + ", launcherId: " + launcherId;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
index c3b12c0..a38345c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
@@ -25,13 +25,16 @@ public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent {
 
   private final ContainerLaunchContext clc;
   private final Container container;
+  // The task communicator index for the specific container being launched.
+  private final int taskCommId;
 
   public NMCommunicatorLaunchRequestEvent(ContainerLaunchContext clc,
-      Container container) {
+      Container container, int launcherId, int taskCommId) {
     super(container.getId(), container.getNodeId(), container
-        .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST);
+        .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST, launcherId);
     this.clc = clc;
     this.container = container;
+    this.taskCommId = taskCommId;
   }
 
   public ContainerLaunchContext getContainerLaunchContext() {
@@ -42,6 +45,10 @@ public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent {
     return container;
   }
 
+  public int getTaskCommId() {
+    return taskCommId;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
index 277d1e7..c9b5c44 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
@@ -25,9 +25,9 @@ import org.apache.hadoop.yarn.api.records.Token;
 public class NMCommunicatorStopRequestEvent extends NMCommunicatorEvent {
 
   public NMCommunicatorStopRequestEvent(ContainerId containerId, NodeId nodeId,
-      Token containerToken) {
+      Token containerToken, int launcherId) {
     super(containerId, nodeId, containerToken,
-        NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
+        NMCommunicatorEventType.CONTAINER_STOP_REQUEST, launcherId);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index caa41df..b69c81a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -110,9 +110,22 @@ public class TaskSchedulerEventHandler extends AbstractService
   private final String[] taskSchedulerClasses;
   protected final TaskSchedulerService []taskSchedulers;
 
+  private final boolean isPureLocalMode;
+  // If running in non local-only mode, the YARN task scheduler will always run to take care of
+  // registration with YARN and heartbeats to YARN.
+  // Splitting registration and heartbeats is not straigh-forward due to the taskScheduler being
+  // tied to a ContainerRequestType.
+  private final int yarnTaskSchedulerIndex;
+  // Custom AppIds to avoid container conflicts if there's multiple sources
+  private final long SCHEDULER_APP_ID_BASE = 111101111;
+  private final long SCHEDULER_APP_ID_INCREMENT = 111111111;
+
   BlockingQueue<AMSchedulerEvent> eventQueue
                               = new LinkedBlockingQueue<AMSchedulerEvent>();
 
+  // Not tracking container / task to schedulerId. Instead relying on everything flowing through
+  // the system and being propagated back via events.
+
   /**
    *
    * @param appContext
@@ -127,7 +140,7 @@ public class TaskSchedulerEventHandler extends AbstractService
   public TaskSchedulerEventHandler(AppContext appContext,
       DAGClientServer clientService, EventHandler eventHandler, 
       ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI,
-      String [] schedulerClasses) {
+      String [] schedulerClasses, boolean isPureLocalMode) {
     super(TaskSchedulerEventHandler.class.getName());
     this.appContext = appContext;
     this.eventHandler = eventHandler;
@@ -135,13 +148,39 @@ public class TaskSchedulerEventHandler extends AbstractService
     this.containerSignatureMatcher = containerSignatureMatcher;
     this.webUI = webUI;
     this.historyUrl = getHistoryUrl();
+    this.isPureLocalMode = isPureLocalMode;
     if (this.webUI != null) {
       this.webUI.setHistoryUrl(this.historyUrl);
     }
-    if (schedulerClasses == null || schedulerClasses.length == 0) {
-      this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+
+    // Override everything for pure local mode
+    if (isPureLocalMode) {
+      this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT};
+      this.yarnTaskSchedulerIndex = -1;
     } else {
-      this.taskSchedulerClasses = schedulerClasses;
+      if (schedulerClasses == null || schedulerClasses.length ==0) {
+        this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+        this.yarnTaskSchedulerIndex = 0;
+      } else {
+        // Ensure the YarnScheduler will be setup and note it's index. This will be responsible for heartbeats and YARN registration.
+        int foundYarnTaskSchedulerIndex = -1;
+        for (int i = 0 ; i < schedulerClasses.length ; i++) {
+          if (schedulerClasses[i].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
+            foundYarnTaskSchedulerIndex = i;
+            break;
+          }
+        }
+        if (foundYarnTaskSchedulerIndex == -1) { // Not found. Add at the end.
+          this.taskSchedulerClasses = new String[schedulerClasses.length+1];
+          foundYarnTaskSchedulerIndex = this.taskSchedulerClasses.length -1;
+          for (int i = 0 ; i < schedulerClasses.length ; i++) { // Copy over the rest.
+            this.taskSchedulerClasses[i] = schedulerClasses[i];
+          }
+        } else {
+          this.taskSchedulerClasses = schedulerClasses;
+        }
+        this.yarnTaskSchedulerIndex = foundYarnTaskSchedulerIndex;
+      }
     }
     taskSchedulers = new TaskSchedulerService[this.taskSchedulerClasses.length];
   }
@@ -159,12 +198,12 @@ public class TaskSchedulerEventHandler extends AbstractService
     return cachedNodeCount;
   }
   
-  public Resource getAvailableResources() {
-    return taskSchedulers[0].getAvailableResources();
+  public Resource getAvailableResources(int schedulerId) {
+    return taskSchedulers[schedulerId].getAvailableResources();
   }
 
-  public Resource getTotalResources() {
-    return taskSchedulers[0].getTotalResources();
+  public Resource getTotalResources(int schedulerId) {
+    return taskSchedulers[schedulerId].getTotalResources();
   }
 
   public synchronized void handleEvent(AMSchedulerEvent sEvent) {
@@ -178,7 +217,7 @@ public class TaskSchedulerEventHandler extends AbstractService
       switch(event.getState()) {
       case FAILED:
       case KILLED:
-        handleTAUnsuccessfulEnd((AMSchedulerEventTAEnded) sEvent);
+        handleTAUnsuccessfulEnd(event);
         break;
       case SUCCEEDED:
         handleTASucceeded(event);
@@ -230,9 +269,9 @@ public class TaskSchedulerEventHandler extends AbstractService
 
   private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate event) {
     if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
-      taskSchedulers[0].blacklistNode(event.getNodeId());
+      taskSchedulers[event.getSchedulerId()].blacklistNode(event.getNodeId());
     } else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) {
-      taskSchedulers[0].unblacklistNode(event.getNodeId());
+      taskSchedulers[event.getSchedulerId()].unblacklistNode(event.getNodeId());
     } else {
       throw new TezUncheckedException("Invalid event type: " + event.getType());
     }
@@ -244,14 +283,14 @@ public class TaskSchedulerEventHandler extends AbstractService
     // TODO what happens to the task that was connected to this container?
     // current assumption is that it will eventually call handleTaStopRequest
     //TaskAttempt taskAttempt = (TaskAttempt)
-    taskSchedulers[0].deallocateContainer(containerId);
+    taskSchedulers[event.getSchedulerId()].deallocateContainer(containerId);
     // TODO does this container need to be stopped via C_STOP_REQUEST
     sendEvent(new AMContainerEventStopRequest(containerId));
   }
 
   private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
     TaskAttempt attempt = event.getAttempt();
-    boolean wasContainerAllocated = taskSchedulers[0].deallocateTask(attempt, false);
+    boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt, false);
     // use stored value of container id in case the scheduler has removed this
     // assignment because the task has been deallocated earlier.
     // retroactive case
@@ -293,7 +332,8 @@ public class TaskSchedulerEventHandler extends AbstractService
           event.getAttemptID()));
     }
 
-    boolean wasContainerAllocated = taskSchedulers[0].deallocateTask(attempt, true);
+    boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt,
+        true);
     if (!wasContainerAllocated) {
       LOG.error("De-allocated successful task: " + attempt.getID()
           + ", but TaskScheduler reported no container assigned to task");
@@ -318,7 +358,7 @@ public class TaskSchedulerEventHandler extends AbstractService
         TaskAttempt affinityAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt();
         if (affinityAttempt != null) {
           Preconditions.checkNotNull(affinityAttempt.getAssignedContainerID(), affinityAttempt.getID());
-          taskSchedulers[0].allocateTask(taskAttempt,
+          taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
               event.getCapability(),
               affinityAttempt.getAssignedContainerID(),
               Priority.newInstance(event.getPriority()),
@@ -338,7 +378,7 @@ public class TaskSchedulerEventHandler extends AbstractService
       }
     }
 
-    taskSchedulers[0].allocateTask(taskAttempt,
+    taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
         event.getCapability(),
         hosts,
         racks,
@@ -349,7 +389,8 @@ public class TaskSchedulerEventHandler extends AbstractService
 
   private TaskSchedulerService createTaskScheduler(String host, int port, String trackingUrl,
                                                    AppContext appContext,
-                                                   String schedulerClassName) {
+                                                   String schedulerClassName,
+                                                   long customAppIdIdentifier) {
     if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
       LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
       return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
@@ -357,7 +398,7 @@ public class TaskSchedulerEventHandler extends AbstractService
     } else if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
       LOG.info("Creating TaskScheduler: Local TaskScheduler");
       return new LocalTaskSchedulerService(this, this.containerSignatureMatcher,
-          host, port, trackingUrl, appContext);
+          host, port, trackingUrl, customAppIdIdentifier, appContext);
     } else {
       LOG.info("Creating custom TaskScheduler: " + schedulerClassName);
       // TODO TEZ-2003 Temporary reflection with specific parameters. Remove once there is a clean interface.
@@ -366,9 +407,10 @@ public class TaskSchedulerEventHandler extends AbstractService
       try {
         Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz
             .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class,
-                int.class, String.class, Configuration.class);
+                int.class, String.class, long.class, Configuration.class);
         ctor.setAccessible(true);
-        return ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig());
+        return ctor.newInstance(this, appContext, host, port, trackingUrl, customAppIdIdentifier,
+            getConfig());
       } catch (NoSuchMethodException e) {
         throw new TezUncheckedException(e);
       } catch (InvocationTargetException e) {
@@ -383,10 +425,19 @@ public class TaskSchedulerEventHandler extends AbstractService
 
   @VisibleForTesting
   protected void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) {
+    // TODO Add error checking for components being used in the Vertex when running in pure local mode.
     // Iterate over the list and create all the taskSchedulers
+    int j = 0;
     for (int i = 0; i < taskSchedulerClasses.length; i++) {
+      long customAppIdIdentifier;
+      if (isPureLocalMode || taskSchedulerClasses[i].equals(
+          TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { // Use the app identifier from the appId.
+        customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp();
+      } else {
+        customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT);
+      }
       taskSchedulers[i] = createTaskScheduler(host, port,
-          trackingUrl, appContext, taskSchedulerClasses[i]);
+          trackingUrl, appContext, taskSchedulerClasses[i], customAppIdIdentifier);
     }
   }
 
@@ -405,12 +456,12 @@ public class TaskSchedulerEventHandler extends AbstractService
     for (int i = 0 ; i < taskSchedulers.length ; i++) {
       taskSchedulers[i].init(getConfig());
       taskSchedulers[i].start();
-    }
-
-    // TODO TEZ-2118 Start using multiple task schedulers
-    if (shouldUnregisterFlag.get()) {
-      // Flag may have been set earlier when task scheduler was not initialized
-      taskSchedulers[0].setShouldUnregister();
+      if (shouldUnregisterFlag.get()) {
+        // Flag may have been set earlier when task scheduler was not initialized
+        // TODO TEZ-2003 Should setRegister / unregister be part of APIs when not YARN specific ?
+        // External services could need to talk to some other entity.
+        taskSchedulers[i].setShouldUnregister();
+      }
     }
 
     this.eventHandlingThread = new Thread("TaskSchedulerEventHandlerThread") {
@@ -459,8 +510,10 @@ public class TaskSchedulerEventHandler extends AbstractService
       if (eventHandlingThread != null)
         eventHandlingThread.interrupt();
     }
-    if (taskSchedulers[0] != null) {
-      ((AbstractService)taskSchedulers[0]).stop();
+    for (int i = 0 ; i < taskSchedulers.length ; i++) {
+      if (taskSchedulers[i] != null) {
+        taskSchedulers[i].stop();
+      }
     }
   }
 
@@ -469,15 +522,18 @@ public class TaskSchedulerEventHandler extends AbstractService
   public synchronized void taskAllocated(Object task,
                                            Object appCookie,
                                            Container container) {
+    AMSchedulerEventTALaunchRequest event =
+        (AMSchedulerEventTALaunchRequest) appCookie;
     ContainerId containerId = container.getId();
-    if (appContext.getAllContainers().addContainerIfNew(container)) {
+    if (appContext.getAllContainers()
+        .addContainerIfNew(container, event.getSchedulerId(), event.getLauncherId(),
+            event.getTaskCommId())) {
       appContext.getNodeTracker().nodeSeen(container.getNodeId());
       sendEvent(new AMNodeEventContainerAllocated(container
           .getNodeId(), container.getId()));
     }
 
-    AMSchedulerEventTALaunchRequest event =
-                         (AMSchedulerEventTALaunchRequest) appCookie;
+
     TaskAttempt taskAttempt = event.getTaskAttempt();
     // TODO - perhaps check if the task still needs this container
     // because the deallocateTask downcall may have raced with the
@@ -486,7 +542,7 @@ public class TaskSchedulerEventHandler extends AbstractService
  
     if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
       sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(),
-          event.getContainerContext()));
+          event.getContainerContext(), event.getLauncherId(), event.getTaskCommId()));
     }
     sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container));
     sendEvent(new AMContainerEventAssignTA(containerId, taskAttempt.getID(),
@@ -605,6 +661,9 @@ public class TaskSchedulerEventHandler extends AbstractService
   public float getProgress() {
     // at this point allocate has been called and so node count must be available
     // may change after YARN-1722
+    // This is a heartbeat in from the scheduler into the APP, and is being used to piggy-back and
+    // node updates from the cluster.
+    // TODO Handle this in TEZ-2124. Need a way to know which scheduler is calling in.
     int nodeCount = taskSchedulers[0].getClusterNodeCount();
     if (nodeCount != cachedNodeCount) {
       cachedNodeCount = nodeCount;
@@ -620,12 +679,17 @@ public class TaskSchedulerEventHandler extends AbstractService
   }
 
   public void dagCompleted() {
-    taskSchedulers[0].resetMatchLocalityForAllHeldContainers();
+    for (int i = 0 ; i < taskSchedulers.length ; i++) {
+      taskSchedulers[i].resetMatchLocalityForAllHeldContainers();
+    }
   }
 
   @Override
   public void preemptContainer(ContainerId containerId) {
-    taskSchedulers[0].deallocateContainer(containerId);
+    // TODO Why is this making a call back into the scheduler, when the call is originating from there.
+    // An AMContainer instance should already exist if an attempt is being made to preempt it
+    AMContainer amContainer = appContext.getAllContainers().get(containerId);
+    taskSchedulers[amContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId);
     // Inform the Containers about completion.
     sendEvent(new AMContainerEventCompleted(containerId, ContainerExitStatus.INVALID,
         "Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION));
@@ -634,13 +698,24 @@ public class TaskSchedulerEventHandler extends AbstractService
   public void setShouldUnregisterFlag() {
     LOG.info("TaskScheduler notified that it should unregister from RM");
     this.shouldUnregisterFlag.set(true);
-    if (this.taskSchedulers[0] != null) {
-      this.taskSchedulers[0].setShouldUnregister();
+    for (int i = 0 ; i < taskSchedulers.length ; i++) {
+      if (this.taskSchedulers[i] != null) {
+        // TODO TEZ-2003 registration required for all schedulers ?
+        this.taskSchedulers[i].setShouldUnregister();
+      }
     }
   }
 
   public boolean hasUnregistered() {
-    return this.taskSchedulers[0].hasUnregistered();
+    boolean result = true;
+    for (int i = 0 ; i < taskSchedulers.length ; i++) {
+      // TODO TEZ-2003 registration required for all schedulers ?
+      result |= this.taskSchedulers[i].hasUnregistered();
+      if (result == false) {
+        return result;
+      }
+    }
+    return result;
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
index 0fc2e12..6616896 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
@@ -34,4 +34,7 @@ public interface AMContainer extends EventHandler<AMContainerEvent>{
   public List<TezTaskAttemptID> getAllTaskAttempts();
   public TezTaskAttemptID getCurrentTaskAttempt();
 
+  public int getTaskSchedulerIdentifier();
+  public int getContainerLauncherIdentifier();
+  public int getTaskCommunicatorIdentifier();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
index d973264..92e5817 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
@@ -27,12 +27,17 @@ public class AMContainerEventLaunchRequest extends AMContainerEvent {
 
   private final TezVertexID vertexId;
   private final ContainerContext containerContext;
+  private final int launcherId;
+  private final int taskCommId;
 
   public AMContainerEventLaunchRequest(ContainerId containerId,
-      TezVertexID vertexId, ContainerContext containerContext) {
+      TezVertexID vertexId, ContainerContext containerContext,
+      int launcherId, int taskCommId) {
     super(containerId, AMContainerEventType.C_LAUNCH_REQUEST);
     this.vertexId = vertexId;
     this.containerContext = containerContext;
+    this.launcherId = launcherId;
+    this.taskCommId = taskCommId;
   }
 
   public TezDAGID getDAGId() {
@@ -46,4 +51,12 @@ public class AMContainerEventLaunchRequest extends AMContainerEvent {
   public ContainerContext getContainerContext() {
     return this.containerContext;
   }
+
+  public int getLauncherId() {
+    return launcherId;
+  }
+
+  public int getTaskCommId() {
+    return taskCommId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 1acec9c..39df2e8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -81,6 +81,9 @@ public class AMContainerImpl implements AMContainer {
   private final TaskAttemptListener taskAttemptListener;
   protected final EventHandler eventHandler;
   private final ContainerSignatureMatcher signatureMatcher;
+  private final int schedulerId;
+  private final int launcherId;
+  private final int taskCommId;
 
   private final List<TezTaskAttemptID> completedAttempts =
       new LinkedList<TezTaskAttemptID>();
@@ -302,7 +305,7 @@ public class AMContainerImpl implements AMContainer {
   // additional change - JvmID, YarnChild, etc depend on TaskType.
   public AMContainerImpl(Container container, ContainerHeartbeatHandler chh,
       TaskAttemptListener tal, ContainerSignatureMatcher signatureMatcher,
-      AppContext appContext) {
+      AppContext appContext, int schedulerId, int launcherId, int taskCommId) {
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
@@ -314,6 +317,9 @@ public class AMContainerImpl implements AMContainer {
     this.containerHeartbeatHandler = chh;
     this.taskAttemptListener = tal;
     this.failedAssignments = new LinkedList<TezTaskAttemptID>();
+    this.schedulerId = schedulerId;
+    this.launcherId = launcherId;
+    this.taskCommId = taskCommId;
     this.stateMachine = stateMachineFactory.make(this);
   }
 
@@ -363,6 +369,21 @@ public class AMContainerImpl implements AMContainer {
     }
   }
 
+  @Override
+  public int getTaskSchedulerIdentifier() {
+    return this.schedulerId;
+  }
+
+  @Override
+  public int getContainerLauncherIdentifier() {
+    return this.launcherId;
+  }
+
+  @Override
+  public int getTaskCommunicatorIdentifier() {
+    return this.taskCommId;
+  }
+
   public boolean isInErrorState() {
     return inError;
   }
@@ -432,7 +453,7 @@ public class AMContainerImpl implements AMContainer {
           containerContext.getLocalResources(),
           containerContext.getEnvironment(),
           containerContext.getJavaOpts(),
-          container.taskAttemptListener.getAddress(), containerContext.getCredentials(),
+          container.taskAttemptListener.getTaskCommunicator(container.taskCommId).getAddress(), containerContext.getCredentials(),
           container.appContext, container.container.getResource(),
           container.appContext.getAMConf());
 
@@ -1014,7 +1035,7 @@ public class AMContainerImpl implements AMContainer {
   }
   
   protected void deAllocate() {
-    sendEvent(new AMSchedulerEventDeallocateContainer(containerId));
+    sendEvent(new AMSchedulerEventDeallocateContainer(containerId, schedulerId));
   }
 
   protected void sendTerminatedToTaskAttempt(
@@ -1044,28 +1065,28 @@ public class AMContainerImpl implements AMContainer {
   }
 
   protected void sendStartRequestToNM(ContainerLaunchContext clc) {
-    sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container));
+    sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container, launcherId, taskCommId));
   }
 
   protected void sendStopRequestToNM() {
     sendEvent(new NMCommunicatorStopRequestEvent(containerId,
-        container.getNodeId(), container.getContainerToken()));
+        container.getNodeId(), container.getContainerToken(), launcherId));
   }
 
   protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId) {
-    taskAttemptListener.unregisterTaskAttempt(attemptId);
+    taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId);
   }
 
   protected void registerAttemptWithListener(AMContainerTask amContainerTask) {
-    taskAttemptListener.registerTaskAttempt(amContainerTask, this.containerId);
+    taskAttemptListener.registerTaskAttempt(amContainerTask, this.containerId, taskCommId);
   }
 
   protected void registerWithTAListener() {
-    taskAttemptListener.registerRunningContainer(containerId);
+    taskAttemptListener.registerRunningContainer(containerId, taskCommId);
   }
 
   protected void unregisterFromTAListener() {
-    this.taskAttemptListener.unregisterRunningContainer(containerId);
+    this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId);
   }
 
   protected void registerWithContainerListener() {

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
index 6037a3a..d67de3d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
@@ -61,9 +61,9 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo
     }
   }
 
-  public boolean addContainerIfNew(Container container) {
+  public boolean addContainerIfNew(Container container, int schedulerId, int launcherId, int taskCommId) {
     AMContainer amc = new AMContainerImpl(container, chh, tal,
-      containerSignatureMatcher, context);
+      containerSignatureMatcher, context, schedulerId, launcherId, taskCommId);
     return (containerMap.putIfAbsent(container.getId(), amc) == null);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index b93cab3..0d8e4cd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -257,7 +257,8 @@ public class AMNodeImpl implements AMNode {
     // these containers are not useful anymore
     pastContainers.addAll(containers);
     containers.clear();
-    sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true));
+    // TODO TEZ-2124 node tracking per ext source
+    sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true, 0));
   }
 
   @SuppressWarnings("unchecked")
@@ -363,7 +364,8 @@ public class AMNodeImpl implements AMNode {
     public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
       node.ignoreBlacklisting = ignore;
       if (node.getState() == AMNodeState.BLACKLISTED) {
-        node.sendEvent(new AMSchedulerEventNodeBlacklistUpdate(node.getNodeId(), false));
+        // TODO TEZ-2124 node tracking per ext source
+        node.sendEvent(new AMSchedulerEventNodeBlacklistUpdate(node.getNodeId(), false, 0));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index fb95106..b3639a3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -176,7 +176,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
     @Override
     public void serviceStart() throws Exception {
       taListener = (TaskAttemptListenerImpTezDag) getTaskAttemptListener();
-      taskCommunicator = (TezTaskCommunicatorImpl) taListener.getTaskCommunicator();
+      taskCommunicator = (TezTaskCommunicatorImpl) taListener.getTaskCommunicator(0);
       eventHandlingThread = new Thread(this);
       eventHandlingThread.start();
       ExecutorService rawExecutor = Executors.newFixedThreadPool(handlerConcurrency,

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index d98e5d1..318ee4e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -74,7 +74,7 @@ public class TestTaskAttemptListenerImplTezDag {
     TaskAttemptListenerImpTezDag taskAttemptListener =
         new TaskAttemptListenerImpTezDag(appContext, mock(TaskHeartbeatHandler.class),
             mock(ContainerHeartbeatHandler.class), null, null);
-    TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator();
+    TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0);
     TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
 
 
@@ -93,37 +93,37 @@ public class TestTaskAttemptListenerImplTezDag {
 
     ContainerId containerId2 = createContainerId(appId, 2);
     ContainerContext containerContext2 = new ContainerContext(containerId2.toString());
-    taskAttemptListener.registerRunningContainer(containerId2);
+    taskAttemptListener.registerRunningContainer(containerId2, 0);
     containerTask = tezUmbilical.getTask(containerContext2);
     assertNull(containerTask);
 
     // Valid task registered
-    taskAttemptListener.registerTaskAttempt(amContainerTask, containerId2);
+    taskAttemptListener.registerTaskAttempt(amContainerTask, containerId2, 0);
     containerTask = tezUmbilical.getTask(containerContext2);
     assertFalse(containerTask.shouldDie());
     assertEquals(taskSpec, containerTask.getTaskSpec());
 
     // Task unregistered. Should respond to heartbeats
-    taskAttemptListener.unregisterTaskAttempt(taskAttemptId);
+    taskAttemptListener.unregisterTaskAttempt(taskAttemptId, 0);
     containerTask = tezUmbilical.getTask(containerContext2);
     assertNull(containerTask);
 
     // Container unregistered. Should send a shouldDie = true
-    taskAttemptListener.unregisterRunningContainer(containerId2);
+    taskAttemptListener.unregisterRunningContainer(containerId2, 0);
     containerTask = tezUmbilical.getTask(containerContext2);
     assertTrue(containerTask.shouldDie());
 
     ContainerId containerId3 = createContainerId(appId, 3);
     ContainerContext containerContext3 = new ContainerContext(containerId3.toString());
-    taskAttemptListener.registerRunningContainer(containerId3);
+    taskAttemptListener.registerRunningContainer(containerId3, 0);
 
     // Register task to container3, followed by unregistering container 3 all together
     TaskSpec taskSpec2 = mock(TaskSpec.class);
     TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class);
     doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
     AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false, 0);
-    taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3);
-    taskAttemptListener.unregisterRunningContainer(containerId3);
+    taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0);
+    taskAttemptListener.unregisterRunningContainer(containerId3, 0);
     containerTask = tezUmbilical.getTask(containerContext3);
     assertTrue(containerTask.shouldDie());
   }
@@ -154,7 +154,7 @@ public class TestTaskAttemptListenerImplTezDag {
     TaskAttemptListenerImpTezDag taskAttemptListener =
         new TaskAttemptListenerImpTezDag(appContext, mock(TaskHeartbeatHandler.class),
             mock(ContainerHeartbeatHandler.class), null, null);
-    TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator();
+    TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0);
     TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
 
 
@@ -167,12 +167,12 @@ public class TestTaskAttemptListenerImplTezDag {
 
     ContainerId containerId1 = createContainerId(appId, 1);
     ContainerContext containerContext1 = new ContainerContext(containerId1.toString());
-    taskAttemptListener.registerRunningContainer(containerId1);
+    taskAttemptListener.registerRunningContainer(containerId1, 0);
     containerTask = tezUmbilical.getTask(containerContext1);
     assertNull(containerTask);
 
     // Register task
-    taskAttemptListener.registerTaskAttempt(amContainerTask, containerId1);
+    taskAttemptListener.registerTaskAttempt(amContainerTask, containerId1, 0);
     containerTask = tezUmbilical.getTask(containerContext1);
     assertFalse(containerTask.shouldDie());
     assertEquals(taskSpec, containerTask.getTaskSpec());

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 224976c..06d8498 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.MockDNSToSwitchMapping;
+import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -272,8 +273,9 @@ public class TestTaskAttempt {
 
     MockEventHandler eventHandler = new MockEventHandler();
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -322,8 +324,9 @@ public class TestTaskAttempt {
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -344,7 +347,7 @@ public class TestTaskAttempt {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();
@@ -423,8 +426,9 @@ public class TestTaskAttempt {
 
     MockEventHandler eventHandler = new MockEventHandler();
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -445,7 +449,7 @@ public class TestTaskAttempt {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();
@@ -488,8 +492,9 @@ public class TestTaskAttempt {
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -510,7 +515,7 @@ public class TestTaskAttempt {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();
@@ -580,8 +585,9 @@ public class TestTaskAttempt {
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -603,7 +609,7 @@ public class TestTaskAttempt {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();
@@ -682,8 +688,9 @@ public class TestTaskAttempt {
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -705,7 +712,7 @@ public class TestTaskAttempt {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();
@@ -774,8 +781,9 @@ public class TestTaskAttempt {
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -796,7 +804,7 @@ public class TestTaskAttempt {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();
@@ -869,8 +877,9 @@ public class TestTaskAttempt {
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -891,7 +900,7 @@ public class TestTaskAttempt {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();
@@ -972,8 +981,9 @@ public class TestTaskAttempt {
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -994,7 +1004,7 @@ public class TestTaskAttempt {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();
@@ -1072,8 +1082,9 @@ public class TestTaskAttempt {
     MockEventHandler mockEh = new MockEventHandler();
     MockEventHandler eventHandler = spy(mockEh);
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -1094,7 +1105,7 @@ public class TestTaskAttempt {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 5ad320e..98ebc68 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2166,7 +2166,7 @@ public class TestVertexImpl {
     doReturn(dagId).when(appContext).getCurrentDAGID();
     doReturn(dagId).when(dag).getID();
     doReturn(taskScheduler).when(appContext).getTaskScheduler();
-    doReturn(Resource.newInstance(102400, 60)).when(taskScheduler).getTotalResources();
+    doReturn(Resource.newInstance(102400, 60)).when(taskScheduler).getTotalResources(0);
     doReturn(historyEventHandler).when(appContext).getHistoryHandler();
     doReturn(dispatcher.getEventHandler()).when(appContext).getEventHandler();
 
@@ -2945,7 +2945,7 @@ public class TestVertexImpl {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appContext);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
     doReturn(containers).when(appContext).getAllContainers();
 
     ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
@@ -2981,7 +2981,7 @@ public class TestVertexImpl {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appContext);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
     doReturn(containers).when(appContext).getAllContainers();
 
     ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
@@ -3018,7 +3018,7 @@ public class TestVertexImpl {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appContext);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
     doReturn(containers).when(appContext).getAllContainers();
 
     ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));

http://git-wip-us.apache.org/repos/asf/tez/blob/5c2818e0/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 54b9adb..c1169ef 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -223,7 +223,7 @@ public class TestContainerReuse {
 
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(
-        ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED));
+        ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
     verify(taskSchedulerEventHandler, times(1)).taskAllocated(
@@ -235,7 +235,7 @@ public class TestContainerReuse {
 
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
-        TaskAttemptState.SUCCEEDED));
+        TaskAttemptState.SUCCEEDED, 0));
 
     long currentTs = System.currentTimeMillis();
     Throwable exception = null;
@@ -356,7 +356,7 @@ public class TestContainerReuse {
 
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
-        TaskAttemptState.SUCCEEDED));
+        TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta21), eq(true));
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(
@@ -459,7 +459,7 @@ public class TestContainerReuse {
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(container1));
 
     // Task assigned to container completed successfully. Container should be re-used.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta12), any(Object.class), eq(container1));
@@ -469,7 +469,7 @@ public class TestContainerReuse {
 
     // Task assigned to container completed successfully.
     // Verify reuse across hosts.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta12), eq(true));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta13), any(Object.class), eq(container1));
@@ -478,7 +478,7 @@ public class TestContainerReuse {
     eventHandler.reset();
 
     // Verify no re-use if a previous task fails.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, 0));
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class), eq(container1));
     verify(taskScheduler).deallocateTask(eq(ta13), eq(false));
@@ -496,7 +496,7 @@ public class TestContainerReuse {
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta14), any(Object.class), eq(container2));
 
     // Task assigned to container completed successfully. No pending requests. Container should be released.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta14), eq(true));
     verify(rmClient).releaseAssignedContainer(eq(container2.getId()));
@@ -607,7 +607,7 @@ public class TestContainerReuse {
 
     // First task had profiling on. This container can not be reused further.
     taskSchedulerEventHandler.handleEvent(
-        new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED));
+        new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta12), any(Object.class),
@@ -653,7 +653,7 @@ public class TestContainerReuse {
 
     // Verify that the container can not be reused when profiling option is turned on
     // Even for 2 tasks having same profiling option can have container reusability.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta13), eq(true));
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class),
@@ -698,7 +698,7 @@ public class TestContainerReuse {
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta15), any(Object.class), eq(container3));
 
     //Ensure task 6 (of vertex 1) is allocated to same container
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta15), eq(true));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta16), any(Object.class), eq(container3));
@@ -811,7 +811,7 @@ public class TestContainerReuse {
     // until delay expires.
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta11, container1.getId(),
-        TaskAttemptState.SUCCEEDED));
+        TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(
@@ -828,7 +828,7 @@ public class TestContainerReuse {
     // TA12 completed.
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta12, container1.getId(),
-        TaskAttemptState.SUCCEEDED));
+        TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     LOG.info("Sleeping to ensure that the scheduling loop runs");
     Thread.sleep(3000l);
@@ -946,7 +946,7 @@ public class TestContainerReuse {
     // Container should  be assigned to task21.
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta11, container1.getId(),
-        TaskAttemptState.SUCCEEDED));
+        TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
     verify(taskSchedulerEventHandler).taskAllocated(
@@ -956,7 +956,7 @@ public class TestContainerReuse {
     // Task 2 completes.
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta21, container1.getId(),
-        TaskAttemptState.SUCCEEDED));
+        TaskAttemptState.SUCCEEDED, 0));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
 
     LOG.info("Sleeping to ensure that the scheduling loop runs");
@@ -1065,7 +1065,7 @@ public class TestContainerReuse {
     assertEquals(1, assignEvent.getRemoteTaskLocalResources().size());
     
     // Task assigned to container completed successfully. Container should be re-used.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta111), eq(true));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta112), any(Object.class), eq(container1));
@@ -1077,7 +1077,7 @@ public class TestContainerReuse {
 
     // Task assigned to container completed successfully.
     // Verify reuse across hosts.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta112), eq(true));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
@@ -1118,7 +1118,7 @@ public class TestContainerReuse {
     assertEquals(2, assignEvent.getRemoteTaskLocalResources().size());
     eventHandler.reset();
 
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta211), eq(true));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta212), any(Object.class), eq(container1));


Mime
View raw message