tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject [2/3] tez git commit: TEZ-3770. DAG-aware YARN task scheduler (jlowe)
Date Thu, 25 Jan 2018 15:44:09 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/a9b8bb5a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
index 49ab77d..5e45e70 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
@@ -187,6 +187,11 @@ class TaskSchedulerContextImplWrapper implements TaskSchedulerContext {
     return real.getAMState();
   }
 
+  @Override
+  public int getVertexIndexForTask(Object task) {
+    return real.getVertexIndexForTask(task);
+  }
+
   // End of getters which do not need to go through a thread. Underlying implementation
   // does not use locks.
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a9b8bb5a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index 7c1b926..5777a2a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -139,6 +139,8 @@ public class TaskSchedulerManager extends AbstractService implements
   BlockingQueue<AMSchedulerEvent> eventQueue
                               = new LinkedBlockingQueue<AMSchedulerEvent>();
 
+  private final String yarnSchedulerClassName;
+
   // Not tracking container / task to schedulerId. Instead relying on everything flowing through
   // the system and being propagated back via events.
 
@@ -164,6 +166,8 @@ public class TaskSchedulerManager extends AbstractService implements
     this.historyUrl = null;
     this.isLocalMode = false;
     this.hadoopShim = new HadoopShimsLoader(appContext.getAMConf()).getHadoopShim();
+    this.yarnSchedulerClassName = appContext.getAMConf().get(TezConfiguration.TEZ_AM_YARN_SCHEDULER_CLASS,
+        TezConfiguration.TEZ_AM_YARN_SCHEDULER_CLASS_DEFAULT);
   }
 
   /**
@@ -196,6 +200,8 @@ public class TaskSchedulerManager extends AbstractService implements
     this.historyUrl = getHistoryUrl();
     this.isLocalMode = isLocalMode;
     this.hadoopShim = hadoopShim;
+    this.yarnSchedulerClassName = appContext.getAMConf().get(TezConfiguration.TEZ_AM_YARN_SCHEDULER_CLASS,
+        TezConfiguration.TEZ_AM_YARN_SCHEDULER_CLASS_DEFAULT);
     this.appCallbackExecutor = createAppCallbackExecutorService();
     if (this.webUI != null) {
       this.webUI.setHistoryUrl(this.historyUrl);
@@ -574,9 +580,11 @@ public class TaskSchedulerManager extends AbstractService implements
 
   @VisibleForTesting
   TaskScheduler createYarnTaskScheduler(TaskSchedulerContext taskSchedulerContext,
-                                        int schedulerId) {
-    LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
-    return new YarnTaskSchedulerService(taskSchedulerContext);
+                                        int schedulerId) throws TezException {
+    LOG.info("Creating YARN TaskScheduler: {}", yarnSchedulerClassName);
+    return ReflectionUtils.createClazzInstance(yarnSchedulerClassName,
+        new Class[] { TaskSchedulerContext.class },
+        new Object[] { taskSchedulerContext });
   }
 
   @VisibleForTesting
@@ -888,11 +896,10 @@ public class TaskSchedulerManager extends AbstractService implements
       LOG.info("Error reported by scheduler {} - {}",
           Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, appContext) + ": " +
               diagnostics);
-      if (taskSchedulerDescriptors[taskSchedulerIndex].getClassName()
-          .equals(YarnTaskSchedulerService.class.getName())) {
+      if (taskSchedulerDescriptors[taskSchedulerIndex].getClassName().equals(yarnSchedulerClassName)) {
         LOG.warn(
             "Reporting a SchedulerServiceError to the DAGAppMaster since the error" +
-                " was reported by the default YARN Task Scheduler");
+                " was reported by the YARN task scheduler");
         sendEvent(new DAGAppMasterEventSchedulingServiceError(diagnostics));
       }
     } else if (servicePluginError.getErrorType() == ServicePluginError.ErrorType.PERMANENT) {
@@ -934,6 +941,11 @@ public class TaskSchedulerManager extends AbstractService implements
     // the context has updated information.
   }
 
+  public int getVertexIndexForTask(Object task) {
+    TaskAttempt attempt = (TaskAttempt) task;
+    return attempt.getVertexID().getId();
+  }
+
   public void preemptContainer(int schedulerId, ContainerId containerId) {
     // TODO Why is this making a call back into the scheduler, when the call is originating from there.
     // An AMContainer instance should already exist if an attempt is being made to preempt it

http://git-wip-us.apache.org/repos/asf/tez/blob/a9b8bb5a/tez-dag/src/test/java/org/apache/tez/dag/app/MockClock.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockClock.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockClock.java
index d015714..c7cf74e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockClock.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockClock.java
@@ -20,9 +20,21 @@ package org.apache.tez.dag.app;
 
 import org.apache.hadoop.yarn.util.Clock;
 
+import java.util.Collection;
+import java.util.LinkedList;
+
 public class MockClock implements Clock {
   
-  long time = 1000;
+  long time;
+  Collection<MockClockListener> listeners = new LinkedList<>();
+
+  public MockClock() {
+    this(1000);
+  }
+
+  public MockClock(long initTime) {
+    time = initTime;
+  }
 
   @Override
   public long getTime() {
@@ -31,6 +43,16 @@ public class MockClock implements Clock {
   
   public void incrementTime(long inc) {
     time += inc;
+    for (MockClockListener listener : listeners) {
+      listener.onTimeUpdated(time);
+    }
+  }
+
+  public void register(MockClockListener listener) {
+    listeners.add(listener);
   }
 
+  public interface MockClockListener {
+    void onTimeUpdated(long newTime);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a9b8bb5a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
new file mode 100644
index 0000000..553306f
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
@@ -0,0 +1,1510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.tez.common.MockDNSToSwitchMapping;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.MockClock;
+import org.apache.tez.dag.app.rm.DagAwareYarnTaskScheduler.AMRMClientAsyncWrapper;
+import org.apache.tez.dag.app.rm.DagAwareYarnTaskScheduler.HeldContainer;
+import org.apache.tez.dag.app.rm.DagAwareYarnTaskScheduler.TaskRequest;
+import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDrainable;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
+import org.apache.tez.test.ControlledScheduledExecutorService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.createCountingExecutingService;
+import static org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.setupMockTaskSchedulerContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestDagAwareYarnTaskScheduler {
+  private ExecutorService contextCallbackExecutor;
+
+  @BeforeClass
+  public static void beforeClass() {
+
+    MockDNSToSwitchMapping.initializeMockRackResolver();
+  }
+
+  @Before
+  public void preTest() {
+    contextCallbackExecutor = Executors.newSingleThreadExecutor(
+        new ThreadFactoryBuilder().setNameFormat("TaskSchedulerAppCallbackExecutor #%d")
+            .setDaemon(true)
+            .build());
+  }
+
+  @After
+  public void postTest() {
+    contextCallbackExecutor.shutdownNow();
+  }
+
+  private TaskSchedulerContextDrainable createDrainableContext(
+      TaskSchedulerContext taskSchedulerContext) {
+    TaskSchedulerContextImplWrapper wrapper =
+        new TaskSchedulerContextImplWrapper(taskSchedulerContext,
+            createCountingExecutingService(contextCallbackExecutor));
+    return new TaskSchedulerContextDrainable(wrapper);
+  }
+
+  @SuppressWarnings({ "unchecked" })
+  @Test(timeout=30000)
+  public void testNoReuse() throws Exception {
+    AMRMClientAsyncWrapperForTest mockRMClient = spy(new AMRMClientAsyncWrapperForTest());
+
+    String appHost = "host";
+    int appPort = 0;
+    String appUrl = "url";
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
+    conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, 100);
+
+    DagInfo mockDagInfo = mock(DagInfo.class);
+    when(mockDagInfo.getTotalVertices()).thenReturn(10);
+    when(mockDagInfo.getVertexDescendants(anyInt())).thenReturn(new BitSet());
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+    when(mockApp.getCurrentDagInfo()).thenReturn(mockDagInfo);
+    TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+
+    MockClock clock = new MockClock(1000);
+    NewTaskSchedulerForTest scheduler = new NewTaskSchedulerForTest(drainableAppCallback,
+        mockRMClient, clock);
+
+    scheduler.initialize();
+    drainableAppCallback.drain();
+
+    scheduler.start();
+    drainableAppCallback.drain();
+    verify(mockRMClient).start();
+    verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl);
+    RegisterApplicationMasterResponse regResponse = mockRMClient.getRegistrationResponse();
+    verify(mockApp).setApplicationRegistrationData(regResponse.getMaximumResourceCapability(),
+        regResponse.getApplicationACLs(), regResponse.getClientToAMTokenMasterKey(),
+        regResponse.getQueue());
+
+    assertEquals(scheduler.getClusterNodeCount(), mockRMClient.getClusterNodeCount());
+
+    Object mockTask1 = new MockTask("task1");
+    Object mockCookie1 = new Object();
+    Resource mockCapability = Resources.createResource(1024, 1);
+    String[] hosts = {"host1", "host5"};
+    String[] racks = {"/default-rack", "/default-rack"};
+    Priority mockPriority = Priority.newInstance(1);
+    ArgumentCaptor<TaskRequest> requestCaptor =
+        ArgumentCaptor.forClass(TaskRequest.class);
+    // allocate task
+    scheduler.allocateTask(mockTask1, mockCapability, hosts,
+        racks, mockPriority, null, mockCookie1);
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(1)).
+        addContainerRequest(any(TaskRequest.class));
+
+    // returned from task requests before allocation happens
+    assertFalse(scheduler.deallocateTask(mockTask1, true, null, null));
+    verify(mockApp, times(0)).containerBeingReleased(any(ContainerId.class));
+    verify(mockRMClient, times(1)).
+        removeContainerRequest(any(TaskRequest.class));
+    verify(mockRMClient, times(0)).
+        releaseAssignedContainer((ContainerId) any());
+
+    // deallocating unknown task
+    assertFalse(scheduler.deallocateTask(mockTask1, true, null, null));
+    verify(mockApp, times(0)).containerBeingReleased(any(ContainerId.class));
+    verify(mockRMClient, times(1)).
+        removeContainerRequest(any(TaskRequest.class));
+    verify(mockRMClient, times(0)).
+        releaseAssignedContainer((ContainerId) any());
+
+    // allocate tasks
+    Object mockTask2 = new MockTask("task2");
+    Object mockCookie2 = new Object();
+    Object mockTask3 = new MockTask("task3");
+    Object mockCookie3 = new Object();
+    scheduler.allocateTask(mockTask1, mockCapability, hosts,
+        racks, mockPriority, null, mockCookie1);
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(2)).
+        addContainerRequest(requestCaptor.capture());
+    TaskRequest request1 = requestCaptor.getValue();
+    scheduler.allocateTask(mockTask2, mockCapability, hosts,
+        racks, mockPriority, null, mockCookie2);
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(3)).
+        addContainerRequest(requestCaptor.capture());
+    TaskRequest request2 = requestCaptor.getValue();
+    scheduler.allocateTask(mockTask3, mockCapability, hosts,
+        racks, mockPriority, null, mockCookie3);
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(4)).
+        addContainerRequest(requestCaptor.capture());
+    TaskRequest request3 = requestCaptor.getValue();
+
+    NodeId host1 = NodeId.newInstance("host1", 1);
+    NodeId host2 = NodeId.newInstance("host2", 2);
+    NodeId host3 = NodeId.newInstance("host3", 3);
+    NodeId host4 = NodeId.newInstance("host4", 4);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1);
+    ContainerId mockCId1 = ContainerId.newContainerId(attemptId, 1);
+    Container mockContainer1 = Container.newInstance(mockCId1, host1, null, mockCapability, mockPriority, null);
+    ContainerId mockCId2 = ContainerId.newContainerId(attemptId, 2);
+    Container mockContainer2 = Container.newInstance(mockCId2, host2, null, mockCapability, mockPriority, null);
+    ContainerId mockCId3 = ContainerId.newContainerId(attemptId, 3);
+    Container mockContainer3 = Container.newInstance(mockCId3, host3, null, mockCapability, mockPriority, null);
+    ContainerId mockCId4 = ContainerId.newContainerId(attemptId, 4);
+    Container mockContainer4 = Container.newInstance(mockCId4, host4, null, mockCapability, mockPriority, null);
+    List<Container> containers = new ArrayList<>();
+    containers.add(mockContainer1);
+    containers.add(mockContainer2);
+    containers.add(mockContainer3);
+    containers.add(mockContainer4);
+    scheduler.onContainersAllocated(containers);
+    drainableAppCallback.drain();
+    // first container allocated
+    verify(mockApp).taskAllocated(mockTask1, mockCookie1, mockContainer1);
+    verify(mockApp).taskAllocated(mockTask2, mockCookie2, mockContainer2);
+    verify(mockApp).taskAllocated(mockTask3, mockCookie3, mockContainer3);
+    // no other allocations returned
+    verify(mockApp, times(3)).taskAllocated(any(), any(), (Container) any());
+    verify(mockRMClient).removeContainerRequest(request1);
+    verify(mockRMClient).removeContainerRequest(request2);
+    verify(mockRMClient).removeContainerRequest(request3);
+    // verify unwanted container released
+    verify(mockRMClient).releaseAssignedContainer(mockCId4);
+
+    // deallocate allocated task
+    assertTrue(scheduler.deallocateTask(mockTask1, true, null, null));
+    drainableAppCallback.drain();
+    verify(mockApp).containerBeingReleased(mockCId1);
+    verify(mockRMClient).releaseAssignedContainer(mockCId1);
+    // deallocate allocated container
+    assertEquals(mockTask2, scheduler.deallocateContainer(mockCId2));
+    drainableAppCallback.drain();
+    verify(mockRMClient).releaseAssignedContainer(mockCId2);
+    verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId) any());
+
+    List<ContainerStatus> statuses = new ArrayList<>();
+    ContainerStatus mockStatus1 = mock(ContainerStatus.class);
+    when(mockStatus1.getContainerId()).thenReturn(mockCId1);
+    statuses.add(mockStatus1);
+    ContainerStatus mockStatus2 = mock(ContainerStatus.class);
+    when(mockStatus2.getContainerId()).thenReturn(mockCId2);
+    statuses.add(mockStatus2);
+    ContainerStatus mockStatus3 = mock(ContainerStatus.class);
+    when(mockStatus3.getContainerId()).thenReturn(mockCId3);
+    statuses.add(mockStatus3);
+    ContainerStatus mockStatus4 = mock(ContainerStatus.class);
+    when(mockStatus4.getContainerId()).thenReturn(mockCId4);
+    statuses.add(mockStatus4);
+
+    scheduler.onContainersCompleted(statuses);
+    drainableAppCallback.drain();
+    // released container status returned
+    verify(mockApp).containerCompleted(mockTask1, mockStatus1);
+    verify(mockApp).containerCompleted(mockTask2, mockStatus2);
+    // currently allocated container status returned and not released
+    verify(mockApp).containerCompleted(mockTask3, mockStatus3);
+    // no other statuses returned
+    verify(mockApp, times(3)).containerCompleted(any(), (ContainerStatus) any());
+    verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId) any());
+
+    // verify blacklisting
+    verify(mockRMClient, times(0)).updateBlacklist(anyListOf(String.class), anyListOf(String.class));
+    String badHost = "host6";
+    NodeId badNodeId = NodeId.newInstance(badHost, 1);
+    scheduler.blacklistNode(badNodeId);
+    List<String> badNodeList = Collections.singletonList(badHost);
+    verify(mockRMClient, times(1)).updateBlacklist(eq(badNodeList), isNull(List.class));
+    Object mockTask4 = new MockTask("task4");
+    Object mockCookie4 = new Object();
+    scheduler.allocateTask(mockTask4, mockCapability, null,
+        null, mockPriority, null, mockCookie4);
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(5)).addContainerRequest(requestCaptor.capture());
+    ContainerId mockCId5 = ContainerId.newContainerId(attemptId, 5);
+    Container mockContainer5 = Container.newInstance(mockCId5, badNodeId, null, mockCapability, mockPriority, null);
+    containers.clear();
+    containers.add(mockContainer5);
+    scheduler.onContainersAllocated(containers);
+    drainableAppCallback.drain();
+    // no new allocation
+    verify(mockApp, times(3)).taskAllocated(any(), any(), (Container) any());
+    // verify blacklisted container released
+    verify(mockRMClient).releaseAssignedContainer(mockCId5);
+    verify(mockRMClient, times(4)).releaseAssignedContainer((ContainerId) any());
+    // verify request added back
+    verify(mockRMClient, times(6)).addContainerRequest(requestCaptor.capture());
+    NodeId host6 = NodeId.newInstance("host6", 6);
+    ContainerId mockCId6 = ContainerId.newContainerId(attemptId, 6);
+    Container mockContainer6 = Container.newInstance(mockCId6, host6, null, mockCapability, mockPriority, null);
+    containers.clear();
+    containers.add(mockContainer6);
+    scheduler.onContainersAllocated(containers);
+    drainableAppCallback.drain();
+    // new allocation
+    verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any());
+    verify(mockApp).taskAllocated(mockTask4, mockCookie4, mockContainer6);
+    // deallocate allocated task
+    assertTrue(scheduler.deallocateTask(mockTask4, true, null, null));
+    drainableAppCallback.drain();
+    verify(mockApp).containerBeingReleased(mockCId6);
+    verify(mockRMClient).releaseAssignedContainer(mockCId6);
+    verify(mockRMClient, times(5)).releaseAssignedContainer((ContainerId) any());
+    // test unblacklist
+    scheduler.unblacklistNode(badNodeId);
+    verify(mockRMClient, times(1)).updateBlacklist(isNull(List.class), eq(badNodeList));
+    assertEquals(0, scheduler.getNumBlacklistedNodes());
+
+    float progress = 0.5f;
+    when(mockApp.getProgress()).thenReturn(progress);
+    assertEquals(progress, scheduler.getProgress(), 0);
+
+    // check duplicate allocation request
+    scheduler.allocateTask(mockTask1, mockCapability, hosts, racks,
+        mockPriority, null, mockCookie1);
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(7)).addContainerRequest(any(TaskRequest.class));
+    verify(mockRMClient, times(6)).
+        removeContainerRequest(any(TaskRequest.class));
+    scheduler.allocateTask(mockTask1, mockCapability, hosts, racks,
+        mockPriority, null, mockCookie1);
+    drainableAppCallback.drain();
+    // old request removed and new one added
+    verify(mockRMClient, times(7)).
+        removeContainerRequest(any(TaskRequest.class));
+    verify(mockRMClient, times(8)).addContainerRequest(any(TaskRequest.class));
+    assertFalse(scheduler.deallocateTask(mockTask1, true, null, null));
+
+    List<NodeReport> mockUpdatedNodes = mock(List.class);
+    scheduler.onNodesUpdated(mockUpdatedNodes);
+    drainableAppCallback.drain();
+    verify(mockApp).nodesUpdated(mockUpdatedNodes);
+
+    ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
+    Exception mockException = new IOException("mockexception");
+    scheduler.onError(mockException);
+    drainableAppCallback.drain();
+    verify(mockApp)
+        .reportError(eq(YarnTaskSchedulerServiceError.RESOURCEMANAGER_ERROR), argumentCaptor.capture(),
+            any(DagInfo.class));
+    assertTrue(argumentCaptor.getValue().contains("mockexception"));
+
+    scheduler.onShutdownRequest();
+    drainableAppCallback.drain();
+    verify(mockApp).appShutdownRequested();
+
+    String appMsg = "success";
+    AppFinalStatus finalStatus =
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+    when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
+    scheduler.shutdown();
+    drainableAppCallback.drain();
+    verify(mockRMClient).
+        unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+            appMsg, appUrl);
+    verify(mockRMClient).stop();
+  }
+
+  @Test(timeout=30000)
+  public void testSimpleReuseLocalMatching() throws Exception {
+    AMRMClientAsyncWrapperForTest mockRMClient = spy(new AMRMClientAsyncWrapperForTest());
+
+    String appHost = "host";
+    int appPort = 0;
+    String appUrl = "url";
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
+    conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 100);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, false);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false);
+    conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, 100);
+
+    DagInfo mockDagInfo = mock(DagInfo.class);
+    when(mockDagInfo.getTotalVertices()).thenReturn(10);
+    when(mockDagInfo.getVertexDescendants(anyInt())).thenReturn(new BitSet());
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+    when(mockApp.getCurrentDagInfo()).thenReturn(mockDagInfo);
+    TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+
+    MockClock clock = new MockClock(1000);
+    NewTaskSchedulerForTest scheduler = new NewTaskSchedulerForTest(drainableAppCallback,
+        mockRMClient, clock);
+
+    scheduler.initialize();
+    drainableAppCallback.drain();
+
+    scheduler.start();
+    drainableAppCallback.drain();
+    verify(mockRMClient).start();
+    verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl);
+    RegisterApplicationMasterResponse regResponse = mockRMClient.getRegistrationResponse();
+    verify(mockApp).setApplicationRegistrationData(regResponse.getMaximumResourceCapability(),
+        regResponse.getApplicationACLs(), regResponse.getClientToAMTokenMasterKey(),
+        regResponse.getQueue());
+
+    assertEquals(scheduler.getClusterNodeCount(), mockRMClient.getClusterNodeCount());
+
+    Priority priorityv0 = Priority.newInstance(1);
+    Priority priorityv1 = Priority.newInstance(2);
+    String[] hostsv0t0 = { "host1", "host2" };
+    MockTaskInfo taskv0t0 = new MockTaskInfo("taskv0t0", priorityv0, hostsv0t0);
+    MockTaskInfo taskv0t1 = new MockTaskInfo("taskv0t1", priorityv0, "host3");
+    MockTaskInfo taskv0t2 = new MockTaskInfo("taskv0t2", priorityv0, hostsv0t0);
+    MockTaskInfo taskv1t0 = new MockTaskInfo("taskv1t0", priorityv1, hostsv0t0);
+    MockTaskInfo taskv1t1 = new MockTaskInfo("taskv1t1", priorityv1, hostsv0t0);
+
+    TaskRequestCaptor taskRequestCaptor = new TaskRequestCaptor(mockRMClient,
+        scheduler, drainableAppCallback);
+    TaskRequest reqv0t0 = taskRequestCaptor.scheduleTask(taskv0t0);
+    taskRequestCaptor.scheduleTask(taskv0t1);
+    TaskRequest reqv0t2 = taskRequestCaptor.scheduleTask(taskv0t2);
+    TaskRequest reqv1t0 = taskRequestCaptor.scheduleTask(taskv1t0);
+    taskRequestCaptor.scheduleTask(taskv1t1);
+
+    NodeId host1 = NodeId.newInstance("host1", 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1);
+    ContainerId cid1 = ContainerId.newContainerId(attemptId, 1);
+    Container container1 = Container.newInstance(cid1, host1, null, taskv0t0.capability, priorityv0, null);
+
+    // allocate one container at v0 priority
+    scheduler.onContainersAllocated(Collections.singletonList(container1));
+    drainableAppCallback.drain();
+    verify(mockApp).taskAllocated(taskv0t0.task, taskv0t0.cookie, container1);
+    verify(mockRMClient).removeContainerRequest(reqv0t0);
+
+    // finish v0t0 successfully, verify v0t1 is skipped and v0t2 instead is assigned to the container
+    assertTrue(scheduler.deallocateTask(taskv0t0.task, true, null, null));
+    clock.incrementTime(10000);
+    drainableAppCallback.drain();
+    verify(mockApp, never()).containerBeingReleased(any(ContainerId.class));
+    verify(mockRMClient, never()).releaseAssignedContainer(any(ContainerId.class));
+    verify(mockApp).taskAllocated(taskv0t2.task, taskv0t2.cookie, container1);
+    verify(mockRMClient).removeContainerRequest(reqv0t2);
+
+    // finish v0t2 successfully, verify v1t0 is assigned to the same container
+    assertTrue(scheduler.deallocateTask(taskv0t2.task, true, null, null));
+    clock.incrementTime(10000);
+    drainableAppCallback.drain();
+    verify(mockApp, never()).containerBeingReleased(any(ContainerId.class));
+    verify(mockRMClient, never()).releaseAssignedContainer(any(ContainerId.class));
+    verify(mockApp).taskAllocated(taskv1t0.task, taskv1t0.cookie, container1);
+    verify(mockRMClient).removeContainerRequest(reqv1t0);
+
+    // fail v1t0 and verify container is released instead of reused for v1t1
+    assertTrue(scheduler.deallocateTask(taskv1t0.task, false, null, null));
+    clock.incrementTime(10000);
+    drainableAppCallback.drain();
+    verify(mockApp).containerBeingReleased(cid1);
+    verify(mockRMClient).releaseAssignedContainer(cid1);
+
+    String appMsg = "success";
+    AppFinalStatus finalStatus =
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+    when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
+    scheduler.shutdown();
+    drainableAppCallback.drain();
+    verify(mockRMClient).
+        unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+            appMsg, appUrl);
+    verify(mockRMClient).stop();
+  }
+
+  @Test(timeout=30000)
+  public void testSimpleReuseRackMatching() throws Exception {
+    AMRMClientAsyncWrapperForTest mockRMClient = spy(new AMRMClientAsyncWrapperForTest());
+
+    String appHost = "host";
+    int appPort = 0;
+    String appUrl = "url";
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
+    conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 100);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false);
+    conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, 100);
+
+    DagInfo mockDagInfo = mock(DagInfo.class);
+    when(mockDagInfo.getTotalVertices()).thenReturn(10);
+    when(mockDagInfo.getVertexDescendants(anyInt())).thenReturn(new BitSet());
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+    when(mockApp.getCurrentDagInfo()).thenReturn(mockDagInfo);
+    TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+
+    MockClock clock = new MockClock(1000);
+    NewTaskSchedulerForTest scheduler = new NewTaskSchedulerForTest(drainableAppCallback,
+        mockRMClient, clock);
+
+    scheduler.initialize();
+    drainableAppCallback.drain();
+
+    scheduler.start();
+    drainableAppCallback.drain();
+    verify(mockRMClient).start();
+    verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl);
+    RegisterApplicationMasterResponse regResponse = mockRMClient.getRegistrationResponse();
+    verify(mockApp).setApplicationRegistrationData(regResponse.getMaximumResourceCapability(),
+        regResponse.getApplicationACLs(), regResponse.getClientToAMTokenMasterKey(),
+        regResponse.getQueue());
+
+    assertEquals(scheduler.getClusterNodeCount(), mockRMClient.getClusterNodeCount());
+
+    Priority priorityv0 = Priority.newInstance(1);
+    Priority priorityv1 = Priority.newInstance(2);
+    String[] hostsv0t0 = { "host1", "host2" };
+    MockTaskInfo taskv0t0 = new MockTaskInfo("taskv0t0", priorityv0, hostsv0t0);
+    MockTaskInfo taskv0t1 = new MockTaskInfo("taskv0t1", priorityv0, "host2");
+    MockTaskInfo taskv0t2 = new MockTaskInfo("taskv0t2", priorityv0, "host4", "/somerack");
+    MockTaskInfo taskv1t0 = new MockTaskInfo("taskv1t0", priorityv1, "host1");
+    MockTaskInfo taskv1t1 = new MockTaskInfo("taskv1t1", priorityv1, "host5");
+
+    TaskRequestCaptor taskRequestCaptor = new TaskRequestCaptor(mockRMClient,
+        scheduler, drainableAppCallback);
+    TaskRequest reqv0t0 = taskRequestCaptor.scheduleTask(taskv0t0);
+    TaskRequest reqv0t1 = taskRequestCaptor.scheduleTask(taskv0t1);
+    taskRequestCaptor.scheduleTask(taskv0t2);
+    TaskRequest reqv1t0 = taskRequestCaptor.scheduleTask(taskv1t0);
+    taskRequestCaptor.scheduleTask(taskv1t1);
+
+    NodeId host1 = NodeId.newInstance("host1", 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1);
+    ContainerId cid1 = ContainerId.newContainerId(attemptId, 1);
+    Container container1 = Container.newInstance(cid1, host1, null, taskv0t0.capability, priorityv0, null);
+
+    // allocate one container at v0 priority
+    scheduler.onContainersAllocated(Collections.singletonList(container1));
+    drainableAppCallback.drain();
+    verify(mockApp).taskAllocated(taskv0t0.task, taskv0t0.cookie, container1);
+    verify(mockRMClient).removeContainerRequest(reqv0t0);
+
+    // finish v0t0 successfully, verify v0t1 is skipped and v1t0 assigned instead
+    // since host locality is preferred to rack locality and lower priority vertex
+    // is not blocked by higher priority vertex
+    assertTrue(scheduler.deallocateTask(taskv0t0.task, true, null, null));
+    clock.incrementTime(10000);
+    drainableAppCallback.drain();
+    verify(mockApp, never()).containerBeingReleased(any(ContainerId.class));
+    verify(mockRMClient, never()).releaseAssignedContainer(any(ContainerId.class));
+    verify(mockApp).taskAllocated(taskv1t0.task, taskv1t0.cookie, container1);
+    verify(mockRMClient).removeContainerRequest(reqv1t0);
+
+    // finish v1t0 successfully, verify v0t1 is assigned
+    assertTrue(scheduler.deallocateTask(taskv1t0.task, true, null, null));
+    clock.incrementTime(10000);
+    drainableAppCallback.drain();
+    verify(mockApp, never()).containerBeingReleased(any(ContainerId.class));
+    verify(mockRMClient, never()).releaseAssignedContainer(any(ContainerId.class));
+    verify(mockApp).taskAllocated(taskv0t1.task, taskv0t1.cookie, container1);
+    verify(mockRMClient).removeContainerRequest(reqv0t1);
+
+    // fail v0t1 and verify container is released instead of reused for v1t1
+    assertTrue(scheduler.deallocateTask(taskv0t1.task, false, null, null));
+    clock.incrementTime(10000);
+    drainableAppCallback.drain();
+    verify(mockApp).containerBeingReleased(cid1);
+    verify(mockRMClient).releaseAssignedContainer(cid1);
+
+    String appMsg = "success";
+    AppFinalStatus finalStatus =
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+    when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
+    scheduler.shutdown();
+    drainableAppCallback.drain();
+    verify(mockRMClient).
+        unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+            appMsg, appUrl);
+    verify(mockRMClient).stop();
+  }
+
+  @Test(timeout=30000)
+  public void testSimpleReuseAnyMatching() throws Exception {
+    AMRMClientAsyncWrapperForTest mockRMClient = spy(new AMRMClientAsyncWrapperForTest());
+
+    String appHost = "host";
+    int appPort = 0;
+    String appUrl = "url";
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
+    conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 100);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, true);
+    conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, 100);
+
+    DagInfo mockDagInfo = mock(DagInfo.class);
+    when(mockDagInfo.getTotalVertices()).thenReturn(10);
+    when(mockDagInfo.getVertexDescendants(anyInt())).thenReturn(new BitSet());
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+    when(mockApp.getCurrentDagInfo()).thenReturn(mockDagInfo);
+    TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+
+    MockClock clock = new MockClock(1000);
+    NewTaskSchedulerForTest scheduler = new NewTaskSchedulerForTest(drainableAppCallback,
+        mockRMClient, clock);
+
+    scheduler.initialize();
+    drainableAppCallback.drain();
+
+    scheduler.start();
+    drainableAppCallback.drain();
+    verify(mockRMClient).start();
+    verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl);
+    RegisterApplicationMasterResponse regResponse = mockRMClient.getRegistrationResponse();
+    verify(mockApp).setApplicationRegistrationData(regResponse.getMaximumResourceCapability(),
+        regResponse.getApplicationACLs(), regResponse.getClientToAMTokenMasterKey(),
+        regResponse.getQueue());
+
+    assertEquals(scheduler.getClusterNodeCount(), mockRMClient.getClusterNodeCount());
+
+    Priority priorityv0 = Priority.newInstance(1);
+    Priority priorityv1 = Priority.newInstance(2);
+    String[] hostsv0t0 = { "host1", "host2" };
+    MockTaskInfo taskv0t0 = new MockTaskInfo("taskv0t0", priorityv0, hostsv0t0);
+    MockTaskInfo taskv0t1 = new MockTaskInfo("taskv0t1", priorityv0, "host2");
+    MockTaskInfo taskv0t2 = new MockTaskInfo("taskv0t2", priorityv0, "host4", "/rack4");
+    MockTaskInfo taskv1t0 = new MockTaskInfo("taskv1t0", priorityv1, "host1");
+    MockTaskInfo taskv1t1 = new MockTaskInfo("taskv1t1", priorityv1, "host6", "/rack6");
+
+    TaskRequestCaptor taskRequestCaptor = new TaskRequestCaptor(mockRMClient,
+        scheduler, drainableAppCallback);
+    TaskRequest reqv0t0 = taskRequestCaptor.scheduleTask(taskv0t0);
+    TaskRequest reqv0t1 = taskRequestCaptor.scheduleTask(taskv0t1);
+    TaskRequest reqv0t2 = taskRequestCaptor.scheduleTask(taskv0t2);
+    TaskRequest reqv1t0 = taskRequestCaptor.scheduleTask(taskv1t0);
+    taskRequestCaptor.scheduleTask(taskv1t1);
+
+    NodeId host1 = NodeId.newInstance("host1", 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1);
+    ContainerId cid1 = ContainerId.newContainerId(attemptId, 1);
+    Container container1 = Container.newInstance(cid1, host1, null, taskv0t0.capability, priorityv0, null);
+
+    // allocate one container at v0 priority
+    scheduler.onContainersAllocated(Collections.singletonList(container1));
+    drainableAppCallback.drain();
+    verify(mockApp).taskAllocated(taskv0t0.task, taskv0t0.cookie, container1);
+    verify(mockRMClient).removeContainerRequest(reqv0t0);
+
+    // finish v0t0 successfully, verify v0t1 is skipped and v1t0 assigned instead
+    // since host locality is preferred to rack locality and lower priority vertex
+    // is not blocked by higher priority vertex
+    assertTrue(scheduler.deallocateTask(taskv0t0.task, true, null, null));
+    clock.incrementTime(10000);
+    drainableAppCallback.drain();
+    verify(mockApp, never()).containerBeingReleased(any(ContainerId.class));
+    verify(mockRMClient, never()).releaseAssignedContainer(any(ContainerId.class));
+    verify(mockApp).taskAllocated(taskv1t0.task, taskv1t0.cookie, container1);
+    verify(mockRMClient).removeContainerRequest(reqv1t0);
+
+    // finish v1t0 successfully, verify v0t1 is assigned
+    assertTrue(scheduler.deallocateTask(taskv1t0.task, true, null, null));
+    clock.incrementTime(10000);
+    drainableAppCallback.drain();
+    verify(mockApp, never()).containerBeingReleased(any(ContainerId.class));
+    verify(mockRMClient, never()).releaseAssignedContainer(any(ContainerId.class));
+    verify(mockApp).taskAllocated(taskv0t1.task, taskv0t1.cookie, container1);
+    verify(mockRMClient).removeContainerRequest(reqv0t1);
+
+    // finish v0t1 successfully, verify v0t2 is assigned
+    assertTrue(scheduler.deallocateTask(taskv0t1.task, true, null, null));
+    clock.incrementTime(10000);
+    drainableAppCallback.drain();
+    verify(mockApp, never()).containerBeingReleased(any(ContainerId.class));
+    verify(mockRMClient, never()).releaseAssignedContainer(any(ContainerId.class));
+    verify(mockApp).taskAllocated(taskv0t2.task, taskv0t2.cookie, container1);
+    verify(mockRMClient).removeContainerRequest(reqv0t2);
+
+    // fail v0t2 and verify container is released instead of reused for v1t1
+    assertTrue(scheduler.deallocateTask(taskv0t2.task, false, null, null));
+    clock.incrementTime(10000);
+    drainableAppCallback.drain();
+    verify(mockApp).containerBeingReleased(cid1);
+    verify(mockRMClient).releaseAssignedContainer(cid1);
+
+    String appMsg = "success";
+    AppFinalStatus finalStatus =
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+    when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
+    scheduler.shutdown();
+    drainableAppCallback.drain();
+    verify(mockRMClient).
+        unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+            appMsg, appUrl);
+    verify(mockRMClient).stop();
+  }
+
+  @Test(timeout=30000)
+  public void testReuseWithAffinity() throws Exception {
+    AMRMClientAsyncWrapperForTest mockRMClient = spy(new AMRMClientAsyncWrapperForTest());
+
+    String appHost = "host";
+    int appPort = 0;
+    String appUrl = "url";
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
+    conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 100);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false);
+    conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, 100);
+
+    DagInfo mockDagInfo = mock(DagInfo.class);
+    when(mockDagInfo.getTotalVertices()).thenReturn(10);
+    when(mockDagInfo.getVertexDescendants(anyInt())).thenReturn(new BitSet());
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+    when(mockApp.getCurrentDagInfo()).thenReturn(mockDagInfo);
+    TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+
+    MockClock clock = new MockClock(1000);
+    NewTaskSchedulerForTest scheduler = new NewTaskSchedulerForTest(drainableAppCallback,
+        mockRMClient, clock);
+
+    scheduler.initialize();
+    drainableAppCallback.drain();
+
+    scheduler.start();
+    drainableAppCallback.drain();
+    verify(mockRMClient).start();
+    verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl);
+    RegisterApplicationMasterResponse regResponse = mockRMClient.getRegistrationResponse();
+    verify(mockApp).setApplicationRegistrationData(regResponse.getMaximumResourceCapability(),
+        regResponse.getApplicationACLs(), regResponse.getClientToAMTokenMasterKey(),
+        regResponse.getQueue());
+
+    assertEquals(scheduler.getClusterNodeCount(), mockRMClient.getClusterNodeCount());
+
+    Priority priorityv0 = Priority.newInstance(1);
+    Priority priorityv1 = Priority.newInstance(2);
+    String[] hostsv0t0 = { "host1", "host2" };
+    MockTaskInfo taskv0t0 = new MockTaskInfo("taskv0t0", priorityv0, hostsv0t0);
+    MockTaskInfo taskv0t1 = new MockTaskInfo("taskv0t1", priorityv0, hostsv0t0);
+
+    TaskRequestCaptor taskRequestCaptor = new TaskRequestCaptor(mockRMClient,
+        scheduler, drainableAppCallback);
+    TaskRequest reqv0t0 = taskRequestCaptor.scheduleTask(taskv0t0);
+    taskRequestCaptor.scheduleTask(taskv0t1);
+
+    NodeId host1 = NodeId.newInstance("host1", 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1);
+    ContainerId cid1 = ContainerId.newContainerId(attemptId, 1);
+    Container container1 = Container.newInstance(cid1, host1, null, taskv0t0.capability, priorityv0, null);
+
+    // allocate one container at v0 priority
+    scheduler.onContainersAllocated(Collections.singletonList(container1));
+    drainableAppCallback.drain();
+    verify(mockApp).taskAllocated(taskv0t0.task, taskv0t0.cookie, container1);
+    verify(mockRMClient).removeContainerRequest(reqv0t0);
+
+    // add a new request for this container
+    MockTaskInfo taskv1t0 = new MockTaskInfo("taskv1t0", priorityv1, "host1");
+    TaskRequest reqv1t0 = taskRequestCaptor.scheduleTask(taskv1t0, cid1);
+
+    // finish v0t0 successfully, verify v0t1 is skipped even though it is node-local
+    // and v1t0 assigned instead for affinity
+    assertTrue(scheduler.deallocateTask(taskv0t0.task, true, null, null));
+    clock.incrementTime(10000);
+    drainableAppCallback.drain();
+    verify(mockApp, never()).containerBeingReleased(any(ContainerId.class));
+    verify(mockRMClient, never()).releaseAssignedContainer(any(ContainerId.class));
+    verify(mockApp).taskAllocated(taskv1t0.task, taskv1t0.cookie, container1);
+    verify(mockRMClient).removeContainerRequest(reqv1t0);
+
+    String appMsg = "success";
+    AppFinalStatus finalStatus =
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+    when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
+    scheduler.shutdown();
+    drainableAppCallback.drain();
+    verify(mockRMClient).
+        unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+            appMsg, appUrl);
+    verify(mockRMClient).stop();
+  }
+
+  @Test(timeout=30000)
+  public void testReuseVertexDescendants() throws Exception {
+    AMRMClientAsyncWrapperForTest mockRMClient = spy(new AMRMClientAsyncWrapperForTest());
+
+    String appHost = "host";
+    int appPort = 0;
+    String appUrl = "url";
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
+    conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 100);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false);
+    conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, 100);
+
+    // vertex 0 and vertex 2 are root vertices and vertex 1 is a child of vertex 0
+    DagInfo mockDagInfo = mock(DagInfo.class);
+    when(mockDagInfo.getTotalVertices()).thenReturn(3);
+    when(mockDagInfo.getVertexDescendants(0)).thenReturn(BitSet.valueOf(new long[] { 0x2 }));
+    when(mockDagInfo.getVertexDescendants(1)).thenReturn(new BitSet());
+    when(mockDagInfo.getVertexDescendants(2)).thenReturn(new BitSet());
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+    when(mockApp.getCurrentDagInfo()).thenReturn(mockDagInfo);
+    TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+
+    MockClock clock = new MockClock(1000);
+    NewTaskSchedulerForTest scheduler = new NewTaskSchedulerForTest(drainableAppCallback,
+        mockRMClient, clock);
+
+    scheduler.initialize();
+    drainableAppCallback.drain();
+
+    scheduler.start();
+    drainableAppCallback.drain();
+    verify(mockRMClient).start();
+    verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl);
+    RegisterApplicationMasterResponse regResponse = mockRMClient.getRegistrationResponse();
+    verify(mockApp).setApplicationRegistrationData(regResponse.getMaximumResourceCapability(),
+        regResponse.getApplicationACLs(), regResponse.getClientToAMTokenMasterKey(),
+        regResponse.getQueue());
+
+    assertEquals(scheduler.getClusterNodeCount(), mockRMClient.getClusterNodeCount());
+
+    Priority priorityv0 = Priority.newInstance(1);
+    Priority priorityv1 = Priority.newInstance(2);
+    Priority priorityv2 = Priority.newInstance(3);
+    String[] hostsv0t0 = { "host1", "host2" };
+    MockTaskInfo taskv0t0 = new MockTaskInfo("taskv0t0", priorityv0, hostsv0t0);
+    when(mockApp.getVertexIndexForTask(taskv0t0.task)).thenReturn(0);
+    MockTaskInfo taskv0t1 = new MockTaskInfo("taskv0t1", priorityv0, "host3");
+    when(mockApp.getVertexIndexForTask(taskv0t1.task)).thenReturn(0);
+    MockTaskInfo taskv1t0 = new MockTaskInfo("taskv1t0", priorityv1, hostsv0t0);
+    when(mockApp.getVertexIndexForTask(taskv1t0.task)).thenReturn(1);
+    MockTaskInfo taskv2t0 = new MockTaskInfo("taskv2t0", priorityv2, hostsv0t0);
+    when(mockApp.getVertexIndexForTask(taskv2t0.task)).thenReturn(2);
+    MockTaskInfo taskv2t1 = new MockTaskInfo("taskv2t1", priorityv2, "host3");
+    when(mockApp.getVertexIndexForTask(taskv2t1.task)).thenReturn(2);
+
+    TaskRequestCaptor taskRequestCaptor = new TaskRequestCaptor(mockRMClient,
+        scheduler, drainableAppCallback);
+    TaskRequest reqv0t0 = taskRequestCaptor.scheduleTask(taskv0t0);
+    TaskRequest reqv0t1 = taskRequestCaptor.scheduleTask(taskv0t1);
+    TaskRequest reqv1t0 = taskRequestCaptor.scheduleTask(taskv1t0);
+    TaskRequest reqv2t0 = taskRequestCaptor.scheduleTask(taskv2t0);
+    taskRequestCaptor.scheduleTask(taskv2t1);
+
+    NodeId host1 = NodeId.newInstance("host1", 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1);
+    ContainerId cid1 = ContainerId.newContainerId(attemptId, 1);
+    Container container1 = Container.newInstance(cid1, host1, null, taskv0t0.capability, priorityv0, null);
+
+    // allocate one container at v0 priority
+    scheduler.onContainersAllocated(Collections.singletonList(container1));
+    drainableAppCallback.drain();
+    verify(mockApp).taskAllocated(taskv0t0.task, taskv0t0.cookie, container1);
+    verify(mockRMClient).removeContainerRequest(reqv0t0);
+
+    // finish v0t0 successfully, verify v1t0 is skipped and v2t0 assigned instead
+    // since host locality beats rack locality for unblocked vertex v2 and
+    // v1 is blocked by pending v0 request
+    assertTrue(scheduler.deallocateTask(taskv0t0.task, true, null, null));
+    clock.incrementTime(10000);
+    drainableAppCallback.drain();
+    verify(mockApp, never()).containerBeingReleased(any(ContainerId.class));
+    verify(mockRMClient, never()).releaseAssignedContainer(any(ContainerId.class));
+    verify(mockApp).taskAllocated(taskv2t0.task, taskv2t0.cookie, container1);
+    verify(mockRMClient).removeContainerRequest(reqv2t0);
+
+    // finish v2t0 successfully, verify v0t1 is assigned since it is higher
+    // priority than v2
+    assertTrue(scheduler.deallocateTask(taskv2t0.task, true, null, null));
+    clock.incrementTime(10000);
+    drainableAppCallback.drain();
+    verify(mockApp, never()).containerBeingReleased(any(ContainerId.class));
+    verify(mockRMClient, never()).releaseAssignedContainer(any(ContainerId.class));
+    verify(mockApp).taskAllocated(taskv0t1.task, taskv0t1.cookie, container1);
+    verify(mockRMClient).removeContainerRequest(reqv0t1);
+
+    // finish v2t0 successfully, verify v1t0 is assigned since it is now unblocked
+    assertTrue(scheduler.deallocateTask(taskv0t1.task, true, null, null));
+    clock.incrementTime(10000);
+    drainableAppCallback.drain();
+    verify(mockApp, never()).containerBeingReleased(any(ContainerId.class));
+    verify(mockRMClient, never()).releaseAssignedContainer(any(ContainerId.class));
+    verify(mockApp).taskAllocated(taskv1t0.task, taskv1t0.cookie, container1);
+    verify(mockRMClient).removeContainerRequest(reqv1t0);
+
+    // fail v1t0 and verify container is released instead of reused for v2t1
+    assertTrue(scheduler.deallocateTask(taskv1t0.task, false, null, null));
+    clock.incrementTime(10000);
+    drainableAppCallback.drain();
+    verify(mockApp).containerBeingReleased(cid1);
+    verify(mockRMClient).releaseAssignedContainer(cid1);
+
+    String appMsg = "success";
+    AppFinalStatus finalStatus =
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+    when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
+    scheduler.shutdown();
+    drainableAppCallback.drain();
+    verify(mockRMClient).
+        unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+            appMsg, appUrl);
+    verify(mockRMClient).stop();
+  }
+
+  @Test(timeout=30000)
+  public void testSessionContainers() throws Exception {
+    AMRMClientAsyncWrapperForTest mockRMClient = spy(new AMRMClientAsyncWrapperForTest());
+
+    String appHost = "host";
+    int appPort = 0;
+    String appUrl = "url";
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
+    conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 100);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false);
+    conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, 100);
+    conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 4000);
+    conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 5000);
+    conf.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, 5);
+
+    DagInfo mockDagInfo = mock(DagInfo.class);
+    when(mockDagInfo.getTotalVertices()).thenReturn(10);
+    when(mockDagInfo.getVertexDescendants(anyInt())).thenReturn(new BitSet());
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+    when(mockApp.getCurrentDagInfo()).thenReturn(mockDagInfo);
+    when(mockApp.isSession()).thenReturn(true);
+    TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+
+    MockClock clock = new MockClock(1000);
+    NewTaskSchedulerForTest scheduler = new NewTaskSchedulerForTest(drainableAppCallback,
+        mockRMClient, clock);
+
+    scheduler.initialize();
+    drainableAppCallback.drain();
+
+    scheduler.start();
+    drainableAppCallback.drain();
+    verify(mockRMClient).start();
+    verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl);
+    RegisterApplicationMasterResponse regResponse = mockRMClient.getRegistrationResponse();
+    verify(mockApp).setApplicationRegistrationData(regResponse.getMaximumResourceCapability(),
+        regResponse.getApplicationACLs(), regResponse.getClientToAMTokenMasterKey(),
+        regResponse.getQueue());
+
+    assertEquals(scheduler.getClusterNodeCount(), mockRMClient.getClusterNodeCount());
+
+    final String rack1 = "/r1";
+    final String rack2 = "/r2";
+    final String rack3 = "/r3";
+    final String node1Rack1 = "n1r1";
+    final String node2Rack1 = "n2r1";
+    final String node1Rack2 = "n1r2";
+    final String node2Rack2 = "n2r2";
+    final String node1Rack3 = "n1r3";
+    MockDNSToSwitchMapping.addRackMapping(node1Rack1, rack1);
+    MockDNSToSwitchMapping.addRackMapping(node2Rack1, rack1);
+    MockDNSToSwitchMapping.addRackMapping(node1Rack2, rack2);
+    MockDNSToSwitchMapping.addRackMapping(node2Rack2, rack2);
+    MockDNSToSwitchMapping.addRackMapping(node1Rack3, rack3);
+
+    Priority priorityv0 = Priority.newInstance(1);
+    MockTaskInfo taskv0t0 = new MockTaskInfo("taskv0t0", priorityv0, node1Rack1, rack1);
+    MockTaskInfo taskv0t1 = new MockTaskInfo("taskv0t1", priorityv0, node2Rack1, rack1);
+    MockTaskInfo taskv0t2 = new MockTaskInfo("taskv0t2", priorityv0, node1Rack1, rack1);
+    MockTaskInfo taskv0t3 = new MockTaskInfo("taskv0t3", priorityv0, node2Rack1, rack1);
+    MockTaskInfo taskv0t4 = new MockTaskInfo("taskv0t4", priorityv0, node1Rack2, rack2);
+    MockTaskInfo taskv0t5 = new MockTaskInfo("taskv0t5", priorityv0, node2Rack2, rack2);
+    MockTaskInfo taskv0t6 = new MockTaskInfo("taskv0t6", priorityv0, node1Rack3, rack3);
+
+    TaskRequestCaptor taskRequestCaptor = new TaskRequestCaptor(mockRMClient,
+        scheduler, drainableAppCallback);
+    TaskRequest reqv0t0 = taskRequestCaptor.scheduleTask(taskv0t0);
+    TaskRequest reqv0t1 = taskRequestCaptor.scheduleTask(taskv0t1);
+    TaskRequest reqv0t2 = taskRequestCaptor.scheduleTask(taskv0t2);
+    TaskRequest reqv0t3 = taskRequestCaptor.scheduleTask(taskv0t3);
+    TaskRequest reqv0t4 = taskRequestCaptor.scheduleTask(taskv0t4);
+    TaskRequest reqv0t5 = taskRequestCaptor.scheduleTask(taskv0t5);
+    TaskRequest reqv0t6 = taskRequestCaptor.scheduleTask(taskv0t6);
+
+    List<Container> containers = new ArrayList<>();
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1);
+    ContainerId cid1 = ContainerId.newContainerId(attemptId, 1);
+    NodeId n1r1 = NodeId.newInstance(node1Rack1, 1);
+    Container container1 = Container.newInstance(cid1, n1r1, null, taskv0t0.capability, priorityv0, null);
+    containers.add(container1);
+    ContainerId cid2 = ContainerId.newContainerId(attemptId, 2);
+    NodeId n2r1 = NodeId.newInstance(node2Rack1, 1);
+    Container container2 = Container.newInstance(cid2, n2r1, null, taskv0t1.capability, priorityv0, null);
+    containers.add(container2);
+    ContainerId cid3 = ContainerId.newContainerId(attemptId, 3);
+    Container container3 = Container.newInstance(cid3, n1r1, null, taskv0t2.capability, priorityv0, null);
+    containers.add(container3);
+    ContainerId cid4 = ContainerId.newContainerId(attemptId, 4);
+    Container container4 = Container.newInstance(cid4, n2r1, null, taskv0t3.capability, priorityv0, null);
+    containers.add(container4);
+    ContainerId cid5 = ContainerId.newContainerId(attemptId, 5);
+    NodeId n1r2 = NodeId.newInstance(node1Rack2, 1);
+    Container container5 = Container.newInstance(cid5, n1r2, null, taskv0t4.capability, priorityv0, null);
+    containers.add(container5);
+    ContainerId cid6 = ContainerId.newContainerId(attemptId, 6);
+    NodeId n2r2 = NodeId.newInstance(node2Rack2, 1);
+    Container container6 = Container.newInstance(cid6, n2r2, null, taskv0t5.capability, priorityv0, null);
+    containers.add(container6);
+    ContainerId cid7 = ContainerId.newContainerId(attemptId, 7);
+    NodeId n1r3 = NodeId.newInstance(node1Rack3, 1);
+    Container container7 = Container.newInstance(cid7, n1r3, null, taskv0t6.capability, priorityv0, null);
+    containers.add(container7);
+
+    scheduler.onContainersAllocated(containers);
+    drainableAppCallback.drain();
+    verify(mockApp).taskAllocated(taskv0t0.task, taskv0t0.cookie, container1);
+    verify(mockRMClient).removeContainerRequest(reqv0t0);
+    verify(mockApp).taskAllocated(taskv0t1.task, taskv0t1.cookie, container2);
+    verify(mockRMClient).removeContainerRequest(reqv0t1);
+    verify(mockApp).taskAllocated(taskv0t2.task, taskv0t2.cookie, container3);
+    verify(mockRMClient).removeContainerRequest(reqv0t2);
+    verify(mockApp).taskAllocated(taskv0t3.task, taskv0t3.cookie, container4);
+    verify(mockRMClient).removeContainerRequest(reqv0t3);
+    verify(mockApp).taskAllocated(taskv0t4.task, taskv0t4.cookie, container5);
+    verify(mockRMClient).removeContainerRequest(reqv0t4);
+    verify(mockApp).taskAllocated(taskv0t5.task, taskv0t5.cookie, container6);
+    verify(mockRMClient).removeContainerRequest(reqv0t5);
+    verify(mockApp).taskAllocated(taskv0t6.task, taskv0t6.cookie, container7);
+    verify(mockRMClient).removeContainerRequest(reqv0t6);
+
+    clock.incrementTime(10000);
+    drainableAppCallback.drain();
+    assertTrue(scheduler.deallocateTask(taskv0t0.task, true, null, null));
+    assertTrue(scheduler.deallocateTask(taskv0t1.task, true, null, null));
+    assertTrue(scheduler.deallocateTask(taskv0t2.task, true, null, null));
+    assertTrue(scheduler.deallocateTask(taskv0t3.task, true, null, null));
+    assertTrue(scheduler.deallocateTask(taskv0t4.task, true, null, null));
+    assertTrue(scheduler.deallocateTask(taskv0t5.task, true, null, null));
+    assertTrue(scheduler.deallocateTask(taskv0t6.task, true, null, null));
+    verify(mockApp, never()).containerBeingReleased(any(ContainerId.class));
+    verify(mockRMClient, never()).releaseAssignedContainer(any(ContainerId.class));
+
+    // verify only two of the containers were released after idle expiration
+    // and the rest were spread across the nodes and racks
+    clock.incrementTime(5000);
+    drainableAppCallback.drain();
+    verify(mockApp, times(2)).containerBeingReleased(any(ContainerId.class));
+    verify(mockRMClient, times(2)).releaseAssignedContainer(any(ContainerId.class));
+    Set<String> hosts = new HashSet<>();
+    Set<String> racks = new HashSet<>();
+    for (HeldContainer hc : scheduler.getSessionContainers()) {
+      hosts.add(hc.getHost());
+      racks.add(hc.getRack());
+    }
+    assertEquals(5, hosts.size());
+    assertEquals(3, racks.size());
+    assertTrue(hosts.contains(node1Rack1));
+    assertTrue(hosts.contains(node2Rack1));
+    assertTrue(hosts.contains(node1Rack2));
+    assertTrue(hosts.contains(node2Rack2));
+    assertTrue(hosts.contains(node1Rack3));
+    assertTrue(racks.contains(rack1));
+    assertTrue(racks.contains(rack2));
+    assertTrue(racks.contains(rack3));
+
+    String appMsg = "success";
+    AppFinalStatus finalStatus =
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+    when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
+    scheduler.shutdown();
+    drainableAppCallback.drain();
+    verify(mockRMClient).
+        unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+            appMsg, appUrl);
+    verify(mockRMClient).stop();
+  }
+
+  @Test(timeout=50000)
+  public void testPreemptionNoHeadroom() throws Exception {
+    AMRMClientAsyncWrapperForTest mockRMClient = spy(new AMRMClientAsyncWrapperForTest());
+
+    String appHost = "host";
+    int appPort = 0;
+    String appUrl = "url";
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
+    conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 100);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false);
+    conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, 100);
+    conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_PERCENTAGE, 10);
+    conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS, 3);
+    conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS, 60 * 1000);
+
+    // vertex 0 and vertex 2 are root vertices and vertex 1 is a child of vertex 0
+    DagInfo mockDagInfo = mock(DagInfo.class);
+    when(mockDagInfo.getTotalVertices()).thenReturn(3);
+    when(mockDagInfo.getVertexDescendants(0)).thenReturn(BitSet.valueOf(new long[] { 0x2 }));
+    when(mockDagInfo.getVertexDescendants(1)).thenReturn(new BitSet());
+    when(mockDagInfo.getVertexDescendants(2)).thenReturn(new BitSet());
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+    when(mockApp.getCurrentDagInfo()).thenReturn(mockDagInfo);
+    TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+
+    MockClock clock = new MockClock(1000);
+    NewTaskSchedulerForTest scheduler = new NewTaskSchedulerForTest(drainableAppCallback,
+        mockRMClient, clock);
+
+    scheduler.initialize();
+    drainableAppCallback.drain();
+
+    scheduler.start();
+    drainableAppCallback.drain();
+    verify(mockRMClient).start();
+    verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl);
+    RegisterApplicationMasterResponse regResponse = mockRMClient.getRegistrationResponse();
+    verify(mockApp).setApplicationRegistrationData(regResponse.getMaximumResourceCapability(),
+        regResponse.getApplicationACLs(), regResponse.getClientToAMTokenMasterKey(),
+        regResponse.getQueue());
+
+    assertEquals(scheduler.getClusterNodeCount(), mockRMClient.getClusterNodeCount());
+
+    Priority priorityv0 = Priority.newInstance(1);
+    Priority priorityv1 = Priority.newInstance(2);
+    Priority priorityv2 = Priority.newInstance(3);
+    String[] hostsv0t0 = { "host1", "host2" };
+    MockTaskInfo taskv0t0 = new MockTaskInfo("taskv0t0", priorityv0, hostsv0t0);
+    when(mockApp.getVertexIndexForTask(taskv0t0.task)).thenReturn(0);
+    MockTaskInfo taskv0t1 = new MockTaskInfo("taskv0t1", priorityv0, hostsv0t0);
+    when(mockApp.getVertexIndexForTask(taskv0t1.task)).thenReturn(0);
+    MockTaskInfo taskv1t0 = new MockTaskInfo("taskv1t0", priorityv1, hostsv0t0);
+    when(mockApp.getVertexIndexForTask(taskv1t0.task)).thenReturn(1);
+    MockTaskInfo taskv1t1 = new MockTaskInfo("taskv1t1", priorityv1, hostsv0t0);
+    when(mockApp.getVertexIndexForTask(taskv1t1.task)).thenReturn(1);
+    MockTaskInfo taskv2t0 = new MockTaskInfo("taskv2t0", priorityv2, hostsv0t0);
+    when(mockApp.getVertexIndexForTask(taskv2t0.task)).thenReturn(2);
+
+    // asks for two tasks for vertex 1 and start running one of them
+    TaskRequestCaptor taskRequestCaptor = new TaskRequestCaptor(mockRMClient,
+        scheduler, drainableAppCallback);
+    TaskRequest reqv1t0 = taskRequestCaptor.scheduleTask(taskv1t0);
+    TaskRequest reqv1t1 = taskRequestCaptor.scheduleTask(taskv1t1);
+    NodeId host1 = NodeId.newInstance("host1", 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1);
+    ContainerId cid1 = ContainerId.newContainerId(attemptId, 1);
+    Container container1 = Container.newInstance(cid1, host1, null, taskv1t0.capability, priorityv1, null);
+    scheduler.onContainersAllocated(Collections.singletonList(container1));
+    drainableAppCallback.drain();
+    verify(mockApp).taskAllocated(taskv1t0.task, taskv1t0.cookie, container1);
+    verify(mockRMClient).removeContainerRequest(reqv1t0);
+
+    // start running the other task for vertex 1 a bit later
+    clock.incrementTime(1000);
+    ContainerId cid2 = ContainerId.newContainerId(attemptId, 2);
+    Container container2 = Container.newInstance(cid2, host1, null, taskv1t0.capability, priorityv1, null);
+    scheduler.onContainersAllocated(Collections.singletonList(container2));
+    drainableAppCallback.drain();
+    verify(mockApp).taskAllocated(taskv1t1.task, taskv1t1.cookie, container2);
+    verify(mockRMClient).removeContainerRequest(reqv1t1);
+
+    // add a request for vertex 0 but there is no headroom
+    when(mockRMClient.getAvailableResources()).thenReturn(Resources.none());
+    TaskRequest reqv0t0 = taskRequestCaptor.scheduleTask(taskv0t0);
+
+    // should preempt after enough heartbeats to get past preemption interval
+    // only the youngest container should be preempted to meet the demand
+    scheduler.getProgress();
+    scheduler.getProgress();
+    scheduler.getProgress();
+    drainableAppCallback.drain();
+    verify(mockApp, times(1)).preemptContainer(any(ContainerId.class));
+    verify(mockApp).preemptContainer(cid2);
+    assertEquals(taskv1t1.task, scheduler.deallocateContainer(cid2));
+    drainableAppCallback.drain();
+    verify(mockApp).containerBeingReleased(cid2);
+    verify(mockRMClient).releaseAssignedContainer(cid2);
+    verify(mockApp, never()).containerBeingReleased(cid1);
+    verify(mockRMClient, never()).releaseAssignedContainer(cid1);
+
+    // add a request for vertex 2 and allocate another container
+    clock.incrementTime(1000);
+    taskRequestCaptor.scheduleTask(taskv2t0);
+    ContainerId cid3 = ContainerId.newContainerId(attemptId, 3);
+    Container container3 = Container.newInstance(cid3, host1, null, taskv0t0.capability, priorityv0, null);
+    scheduler.onContainersAllocated(Collections.singletonList(container3));
+    drainableAppCallback.drain();
+    verify(mockApp).taskAllocated(taskv0t0.task, taskv0t0.cookie, container3);
+    verify(mockRMClient).removeContainerRequest(reqv0t0);
+
+    // no more preemptions since v1 is not a descendant of v2
+    scheduler.getProgress();
+    scheduler.getProgress();
+    scheduler.getProgress();
+    drainableAppCallback.drain();
+    verify(mockApp, times(1)).preemptContainer(any(ContainerId.class));
+
+    // adding request for v0 should trigger preemption on next heartbeat
+    taskRequestCaptor.scheduleTask(taskv0t1);
+    scheduler.getProgress();
+    drainableAppCallback.drain();
+    verify(mockApp, times(2)).preemptContainer(any(ContainerId.class));
+    verify(mockApp).preemptContainer(cid1);
+    assertEquals(taskv1t0.task, scheduler.deallocateContainer(cid1));
+    drainableAppCallback.drain();
+    verify(mockApp).containerBeingReleased(cid1);
+    verify(mockRMClient).releaseAssignedContainer(cid1);
+
+    String appMsg = "success";
+    AppFinalStatus finalStatus =
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+    when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
+    scheduler.shutdown();
+    drainableAppCallback.drain();
+    verify(mockRMClient).
+        unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+            appMsg, appUrl);
+    verify(mockRMClient).stop();
+  }
+
+  @Test(timeout=50000)
+  public void testIdleContainerAssignment() throws Exception {
+    AMRMClientAsyncWrapperForTest mockRMClient = spy(new AMRMClientAsyncWrapperForTest());
+
+    String appHost = "host";
+    int appPort = 0;
+    String appUrl = "url";
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
+    conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 100);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, false);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false);
+    conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, 100);
+    conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 4000);
+    conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 5000);
+    conf.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, 5);
+
+    DagInfo mockDagInfo = mock(DagInfo.class);
+    when(mockDagInfo.getTotalVertices()).thenReturn(10);
+    when(mockDagInfo.getVertexDescendants(anyInt())).thenReturn(new BitSet());
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+    when(mockApp.getCurrentDagInfo()).thenReturn(mockDagInfo);
+    when(mockApp.isSession()).thenReturn(true);
+    TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+
+    MockClock clock = new MockClock(1000);
+    NewTaskSchedulerForTest scheduler = new NewTaskSchedulerForTest(drainableAppCallback,
+        mockRMClient, clock);
+
+    scheduler.initialize();
+    drainableAppCallback.drain();
+
+    scheduler.start();
+    drainableAppCallback.drain();
+    verify(mockRMClient).start();
+    verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl);
+    RegisterApplicationMasterResponse regResponse = mockRMClient.getRegistrationResponse();
+    verify(mockApp).setApplicationRegistrationData(regResponse.getMaximumResourceCapability(),
+        regResponse.getApplicationACLs(), regResponse.getClientToAMTokenMasterKey(),
+        regResponse.getQueue());
+
+    assertEquals(scheduler.getClusterNodeCount(), mockRMClient.getClusterNodeCount());
+
+    final String rack1 = "/r1";
+    final String rack2 = "/r2";
+    final String node1Rack1 = "n1r1";
+    final String node2Rack1 = "n2r1";
+    final String node1Rack2 = "n1r2";
+    MockDNSToSwitchMapping.addRackMapping(node1Rack1, rack1);
+    MockDNSToSwitchMapping.addRackMapping(node2Rack1, rack1);
+    MockDNSToSwitchMapping.addRackMapping(node1Rack2, rack2);
+
+    Priority priorityv0 = Priority.newInstance(1);
+    MockTaskInfo taskv0t0 = new MockTaskInfo("taskv0t0", priorityv0, node1Rack1, rack1);
+
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1);
+    ContainerId cid1 = ContainerId.newContainerId(attemptId, 1);
+    NodeId n2r1 = NodeId.newInstance(node2Rack1, 1);
+    Container container1 = Container.newInstance(cid1, n2r1, null, taskv0t0.capability, priorityv0, null);
+
+    // verify idle container is kept for now
+    scheduler.onContainersAllocated(Collections.singletonList(container1));
+    clock.incrementTime(2000);
+    drainableAppCallback.drain();
+    verify(mockApp, never()).containerBeingReleased(cid1);
+    verify(mockRMClient, never()).releaseAssignedContainer(cid1);
+
+    // verify idle container is released without being assigned to a task because rack-local reuse is
+    // disabled
+    TaskRequestCaptor taskRequestCaptor = new TaskRequestCaptor(mockRMClient,
+        scheduler, drainableAppCallback);
+    TaskRequest reqv0t0 = taskRequestCaptor.scheduleTask(taskv0t0);
+    clock.incrementTime(10000);
+    drainableAppCallback.drain();
+    verify(mockApp, never()).taskAllocated(taskv0t0.task, taskv0t0.cookie, container1);
+    verify(mockRMClient, never()).removeContainerRequest(reqv0t0);
+    verify(mockApp, never()).containerBeingReleased(cid1);
+    verify(mockRMClient).releaseAssignedContainer(cid1);
+
+    // cancel the task request
+    assertFalse(scheduler.deallocateTask(taskv0t0.task, false, null, null));
+
+    // allocate another container that's node-local
+    ContainerId cid2 = ContainerId.newContainerId(attemptId, 2);
+    NodeId n1r1 = NodeId.newInstance(node1Rack1, 1);
+    Container container2 = Container.newInstance(cid2, n1r1, null, taskv0t0.capability, priorityv0, null);
+    scheduler.onContainersAllocated(Collections.singletonList(container2));
+    clock.incrementTime(2000);
+    drainableAppCallback.drain();
+    verify(mockApp, never()).containerBeingReleased(cid2);
+    verify(mockRMClient, never()).releaseAssignedContainer(cid2);
+
+    // reschedule the task, verify it's now scheduled without a container request
+    // since node-local idle container is available
+    reqv0t0 = taskRequestCaptor.scheduleTask(taskv0t0, false);
+    verify(mockApp).taskAllocated(taskv0t0.task, taskv0t0.cookie, container2);
+    verify(mockRMClient).removeContainerRequest(reqv0t0);
+  }
+
+  static class AMRMClientAsyncWrapperForTest extends AMRMClientAsyncWrapper {
+    AMRMClientAsyncWrapperForTest() {
+      super(new MockAMRMClient(), 10000, null);
+    }
+
+    RegisterApplicationMasterResponse getRegistrationResponse() {
+      return ((MockAMRMClient) client).getRegistrationResponse();
+    }
+  }
+
+  static class MockAMRMClient extends AMRMClientImpl<TaskRequest> {
+    private RegisterApplicationMasterResponse mockRegResponse;
+
+    MockAMRMClient() {
+      super();
+      this.clusterAvailableResources = Resource.newInstance(4000, 4);
+      this.clusterNodeCount = 5;
+    }
+
+    @Override
+    protected void serviceStart() {
+    }
+
+    @Override
+    protected void serviceStop() {
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public RegisterApplicationMasterResponse registerApplicationMaster(
+        String appHostName, int appHostPort, String appTrackingUrl) {
+      mockRegResponse = mock(RegisterApplicationMasterResponse.class);
+      Resource mockMaxResource = Resources.createResource(1024*1024, 1024);
+      Map<ApplicationAccessType, String> mockAcls = Collections.emptyMap();
+      when(mockRegResponse.getMaximumResourceCapability()).thenReturn(
+          mockMaxResource);
+      when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
+      when(mockRegResponse.getSchedulerResourceTypes()).thenReturn(
+          EnumSet.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU));
+      return mockRegResponse;
+    }
+
+    @Override
+    public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+        String appMessage, String appTrackingUrl) {
+    }
+
+    RegisterApplicationMasterResponse getRegistrationResponse() {
+      return mockRegResponse;
+    }
+  }
+
+  static class MockTask {
+    final String name;
+
+    MockTask(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public String toString() {
+      return name;
+    }
+  }
+
+  static class MockTaskInfo {
+    final static Object DEFAULT_SIGNATURE = new Object();
+
+    final MockTask task;
+    final Object cookie = new Object();
+    final Object signature = DEFAULT_SIGNATURE;
+    final String[] hosts;
+    final String[] racks;
+    final Priority priority;
+    final Resource capability;
+
+    MockTaskInfo(String name, Priority priority, String host) {
+      this(name, priority, host == null ? null : new String[] { host });
+    }
+
+    MockTaskInfo(String name, Priority priority, String[] hosts) {
+      this(name, priority, hosts, buildDefaultRacks(hosts));
+    }
+
+    MockTaskInfo(String name, Priority priority, String host, String rack) {
+      this(name, priority, host == null ? null : new String[] { host },
+          rack == null ? null : new String[] { rack });
+    }
+
+    MockTaskInfo(String name, Priority priority, String[] hosts, String[] racks) {
+      this.task = new MockTask(name);
+      this.hosts = hosts;
+      this.racks = racks;
+      this.priority = priority;
+      this.capability = Resource.newInstance(1024, 1);
+    }
+
+    static String[] buildDefaultRacks(String[] hosts) {
+      if (hosts == null) {
+        return null;
+      }
+      String[] racks = new String[hosts.length];
+      Arrays.fill(racks, "/default-rack");
+      return racks;
+    }
+  }
+
+  static class TaskRequestCaptor {
+    final AMRMClientAsync<TaskRequest> client;
+    final TaskScheduler scheduler;
+    final TaskSchedulerContextDrainable drainableAppCallback;
+    final ArgumentCaptor<TaskRequest> captor = ArgumentCaptor.forClass(TaskRequest.class);
+    int invocationCount = 0;
+
+    TaskRequestCaptor(AMRMClientAsync<TaskRequest> client, TaskScheduler scheduler,
+        TaskSchedulerContextDrainable drainableAppCallback) {
+      this.client = client;
+      this.scheduler = scheduler;
+      this.drainableAppCallback = drainableAppCallback;
+    }
+
+    TaskRequest scheduleTask(MockTaskInfo taskInfo) throws Exception {
+      return scheduleTask(taskInfo, true);
+    }
+
+    TaskRequest scheduleTask(MockTaskInfo taskInfo, boolean expectContainerRequest) throws Exception {
+      scheduler.allocateTask(taskInfo.task, taskInfo.capability, taskInfo.hosts, taskInfo.racks,
+          taskInfo.priority, taskInfo.signature, taskInfo.cookie);
+      drainableAppCallback.drain();
+      if (expectContainerRequest) {
+        ++invocationCount;
+      }
+      verify(client, times(invocationCount)).addContainerRequest(captor.capture());
+      TaskRequest request = captor.getValue();
+      assertEquals(request.getTask(), taskInfo.task);
+      assertEquals(request.getCookie(), taskInfo.cookie);
+      return request;
+    }
+
+    TaskRequest scheduleTask(MockTaskInfo taskInfo, ContainerId affinity) throws Exception {
+      scheduler.allocateTask(taskInfo.task, taskInfo.capability, affinity, taskInfo.priority,
+          taskInfo.signature, taskInfo.cookie);
+      drainableAppCallback.drain();
+      verify(client, times(++invocationCount)).addContainerRequest(captor.capture());
+      TaskRequest request = captor.getValue();
+      assertEquals(request.getTask(), taskInfo.task);
+      assertEquals(request.getCookie(), taskInfo.cookie);
+      return request;
+    }
+  }
+
+  static class NewTaskSchedulerForTest extends DagAwareYarnTaskScheduler {
+    final AMRMClientAsyncWrapper mockClient;
+    final MockClock clock;
+
+    NewTaskSchedulerForTest(
+        TaskSchedulerContextDrainable appClient,
+        AMRMClientAsyncWrapper client, MockClock clock) {
+      super(appClient);
+      this.mockClient = client;
+      this.clock = clock;
+      setShouldUnregister();
+    }
+
+    @Override
+    public void initialize() throws Exception {
+      initialize(mockClient);
+    }
+
+    @Override
+    protected ScheduledExecutorService createExecutor() {
+      return new ControlledScheduledExecutorService(clock);
+    }
+
+    @Override
+    protected long now() {
+      return clock.getTime();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/a9b8bb5a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 2c2452b..9cd1e23 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -366,6 +366,11 @@ class TestTaskSchedulerHelpers {
     }
 
     @Override
+    public int getVertexIndexForTask(Object task) {
+      return real.getVertexIndexForTask(task);
+    }
+
+    @Override
     public void preemptContainer(ContainerId cId) {
       invocations++;
       real.preemptContainer(cId);

http://git-wip-us.apache.org/repos/asf/tez/blob/a9b8bb5a/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java b/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java
index b0eeaef..ab446ac 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java
@@ -17,6 +17,8 @@ package org.apache.tez.dag.helpers;
 import org.apache.hadoop.security.Credentials;
 import org.apache.tez.serviceplugins.api.DagInfo;
 
+import java.util.BitSet;
+
 public class DagInfoImplForTest implements DagInfo {
 
   private final int index;
@@ -41,4 +43,14 @@ public class DagInfoImplForTest implements DagInfo {
   public Credentials getCredentials() {
     return null;
   }
+
+  @Override
+  public int getTotalVertices() {
+    return 0;
+  }
+
+  @Override
+  public BitSet getVertexDescendants(int vertexIndex) {
+    return null;
+  }
 }


Mime
View raw message