tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-585. Unify code path for reused and new containers (bikas)
Date Mon, 28 Oct 2013 17:21:38 GMT
Updated Branches:
  refs/heads/master 8ed793a6f -> 84c6c0bc1


TEZ-585. Unify code path for reused and new containers (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/84c6c0bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/84c6c0bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/84c6c0bc

Branch: refs/heads/master
Commit: 84c6c0bc1e82fdc385be79e4f54e17640af26c96
Parents: 8ed793a
Author: Bikas Saha <bikas@apache.org>
Authored: Mon Oct 28 10:18:58 2013 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Mon Oct 28 10:18:58 2013 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/app/rm/TaskScheduler.java    | 146 +++++++++++++++----
 .../tez/dag/app/rm/TestContainerReuse.java      |  36 ++++-
 .../tez/dag/app/rm/TestTaskScheduler.java       |   8 +-
 3 files changed, 156 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/84c6c0bc/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index e3903f9..8b876e1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -387,7 +388,7 @@ public class TaskScheduler extends AbstractService
         Object task = releasedContainers.remove(completedId);
         if(task != null){
           if (delayedContainer != null) {
-            LOG.warn("Held container sohuld be null since releasedContainer is not");
+            LOG.warn("Held container should be null since releasedContainer is not");
           }
           // TODO later we may want to check if exit code matched expectation
           // e.g. successful container should not come back fail exit code after
@@ -437,9 +438,15 @@ public class TaskScheduler extends AbstractService
     Map<CookieContainerRequest, Container> assignedContainers;
 
     synchronized (this) {
-      List<Container> modifiableContainerList = Lists.newLinkedList(containers);
-      assignedContainers = assignNewlyAllocatedContainers(
-          modifiableContainerList);
+      if (!shouldReuseContainers) {
+        List<Container> modifiableContainerList = Lists.newLinkedList(containers);
+        assignedContainers = assignNewlyAllocatedContainers(
+            modifiableContainerList);
+      } else {
+        // unify allocations
+        pushNewContainerToDelayed(containers);
+        return;
+      }
     }
 
     // upcall to app must be outside locks
@@ -503,15 +510,18 @@ public class TaskScheduler extends AbstractService
       assignDelayedContainer(HeldContainer heldContainer) {
 
     DAGAppMasterState state = appContext.getAMState();
-
+    boolean isNew = heldContainer.isNew();
     if (LOG.isDebugEnabled()) {
-      LOG.info("Trying to assign a delayed container"
+      LOG.debug("Trying to assign a delayed container"
         + ", containerId=" + heldContainer.getContainer().getId()
         + ", nextScheduleTime=" + heldContainer.getNextScheduleTime()
         + ", containerExpiryTime=" + heldContainer.getContainerExpiryTime()
         + ", AMState=" + state
         + ", matchLevel=" + heldContainer.getLocalityMatchLevel()
-        + ", taskRequestsCount=" + taskRequests.size());
+        + ", taskRequestsCount=" + taskRequests.size()
+        + ", heldContainers=" + heldContainers.size()
+        + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
+        + ", isNew=" + isNew);
     }
 
     if (state.equals(DAGAppMasterState.IDLE) || taskRequests.isEmpty()) {
@@ -521,13 +531,17 @@ public class TaskScheduler extends AbstractService
 
       heldContainer.resetLocalityMatchLevel();
       long currentTime = System.currentTimeMillis();
-      if (heldContainer.getContainerExpiryTime() <= currentTime
-          && sessionDelay != -1) {
-        LOG.info("Container's session delay expired. Releasing container"
+      if (isNew || (heldContainer.getContainerExpiryTime() <= currentTime
+          && sessionDelay != -1)) {
+        LOG.info("Container's session delay expired or is new. Releasing container"
           + ", containerId=" + heldContainer.container.getId()
           + ", containerExpiryTime="
           + heldContainer.getContainerExpiryTime()
-          + ", sessionDelay=" + sessionDelay);
+          + ", sessionDelay=" + sessionDelay
+          + ", taskRequestsCount=" + taskRequests.size()
+          + ", heldContainers=" + heldContainers.size()
+          + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
+          + ", isNew=" + isNew);
         releaseUnassignedContainers(
             Lists.newArrayList(heldContainer.container));
       } else {
@@ -555,7 +569,8 @@ public class TaskScheduler extends AbstractService
 
       // if match level is NEW or NODE, match only at node-local
       // always try node local matches for other levels
-      if (localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NEW)
+      if (isNew
+          || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NEW)
           || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NODE)
           || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.RACK)
           || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NON_LOCAL)) {
@@ -571,7 +586,7 @@ public class TaskScheduler extends AbstractService
       // match against rack if match level is RACK or NON-LOCAL
       // if scheduling delay is 0, match at RACK allowed without a sleep
       if (assignedContainers.isEmpty()) {
-        if (reuseRackLocal && (localitySchedulingDelay == 0 ||
+        if ((reuseRackLocal || isNew) && (localitySchedulingDelay == 0 ||
           (localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.RACK)
             || localityMatchLevel.equals(
               HeldContainer.LocalityMatchLevel.NON_LOCAL)))) {
@@ -588,7 +603,7 @@ public class TaskScheduler extends AbstractService
       // match against rack if match level is NON-LOCAL
       // if scheduling delay is 0, match at NON-LOCAL allowed without a sleep
       if (assignedContainers.isEmpty()) {
-        if (reuseNonLocal && (localitySchedulingDelay == 0
+        if ((reuseNonLocal || isNew) && (localitySchedulingDelay == 0
             || localityMatchLevel.equals(
                 HeldContainer.LocalityMatchLevel.NON_LOCAL))) {
          assignReUsedContainerWithLocation(containerToAssign,
@@ -632,6 +647,12 @@ public class TaskScheduler extends AbstractService
                         HeldContainer.LocalityMatchLevel.NON_LOCAL)))) {
               hitFinalMatchLevel = true;
             }
+            // the above if-stmt does not apply to new containers since they will
+            // be matched at all locality levels. So there finalMatchLevel cannot
+            // be short-circuited
+            if (localitySchedulingDelay > 0 && isNew) {
+              hitFinalMatchLevel = false;
+            }
           }
 
           if (hitFinalMatchLevel) {
@@ -642,7 +663,8 @@ public class TaskScheduler extends AbstractService
                 + " unmatched requests or this is not a session"
                 + ", containerId=" + heldContainer.container.getId()
                 + ", pendingTasks=" + !taskRequests.isEmpty()
-                + ", isSession=" + appContext.isSession());
+                + ", isSession=" + appContext.isSession()
+                + ". isNew=" + isNew);
               releaseUnassignedContainers(
                 Lists.newArrayList(heldContainer.container));
             } else {
@@ -755,9 +777,11 @@ public class TaskScheduler extends AbstractService
 
     addTaskRequest(task, request);
     // See if any of the delayedContainers can be used for this task.
-    delayedContainerManager.tryAssigningAll();
+    delayedContainerManager.triggerScheduling(true);
     LOG.info("Allocation request for task: " + task +
-      " with request: " + request);
+      " with request: " + request + 
+      " host: " + ((hosts!=null&&hosts.length>0)?hosts[0]:"null") +
+      " rack: " + ((racks!=null&&racks.length>0)?racks[0]:"null"));
   }
 
   /**
@@ -814,7 +838,7 @@ public class TaskScheduler extends AbstractService
     }
     return true;
   }
-
+  
   public synchronized Object deallocateContainer(ContainerId containerId) {
     Object task = unAssignContainer(containerId, true);
     if(task != null) {
@@ -832,9 +856,15 @@ public class TaskScheduler extends AbstractService
       allocatedResources);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Allocated resource memory: " + allocatedResources.getMemory() +
-        " cpu:" + allocatedResources.getVirtualCores());
+        " cpu:" + allocatedResources.getVirtualCores() + 
+        " delayedContainers: " + delayedContainerManager.delayedContainers.size());
     }
     assert freeResources.getMemory() >= 0;
+    
+    if (delayedContainerManager.delayedContainers.size() > 0) {
+      // if we are holding onto containers then nothing to preempt from outside
+      return;
+    }
 
     CookieContainerRequest highestPriRequest = null;
     for(CookieContainerRequest request : taskRequests.values()) {
@@ -926,7 +956,7 @@ public class TaskScheduler extends AbstractService
   private boolean canAssignTaskToContainer(
       CookieContainerRequest cookieContainerRequest, Container container) {
     HeldContainer heldContainer = heldContainers.get(container.getId());
-    if (heldContainer == null) { // New container.
+    if (heldContainer == null || heldContainer.isNew()) { // New container.
       return true;
     } else {
       if (LOG.isDebugEnabled()) {
@@ -982,10 +1012,45 @@ public class TaskScheduler extends AbstractService
     Container result = taskAllocations.put(task, container);
     assert result == null;
     containerAssignments.put(container.getId(), task);
-    if (heldContainers.put(container.getId(), new HeldContainer(container,
-        -1, -1, assigned.getCookie().getContainerSignature())) == null) {
+    HeldContainer heldContainer = heldContainers.get(container.getId()); 
+    if (heldContainer == null) {
+      heldContainers.put(container.getId(), new HeldContainer(container,
+        -1, -1, assigned.getCookie().getContainerSignature()));
       Resources.addTo(allocatedResources, container.getResource());
+    } else {
+      if (heldContainer.isNew()) {
+        // check for existence before adding since the first container potentially
+        // has the broadest signature as subsequent uses dont expand any dimension.
+        // This will need to be enhanced to track other signatures too when we
+        // think about preferring within vertex matching etc.
+        heldContainers.put(container.getId(),
+            new HeldContainer(container, heldContainer.getNextScheduleTime(),
+                heldContainer.getContainerExpiryTime(), assigned.getCookie()
+                    .getContainerSignature()));
+      }
+    }
+  }
+  
+  private void pushNewContainerToDelayed(List<Container> containers){
+    long expireTime = -1;
+    if (sessionDelay > 0) {
+      long currentTime = System.currentTimeMillis();
+      expireTime = currentTime + sessionDelay;
+    }
+
+    synchronized (delayedContainerManager) {
+      for (Container container : containers) {
+        if (heldContainers.put(container.getId(), new HeldContainer(container,
+            -1, expireTime, null)) != null) {
+          throw new TezUncheckedException("New container " + container.getId()
+              + " is already held.");
+        }
+        Resources.addTo(allocatedResources, container.getResource());
+          delayedContainerManager.addDelayedContainer(container,
+              delayedContainerManager.maxScheduleTimeSeen + 1);
+      }
     }
+    delayedContainerManager.triggerScheduling(false);      
   }
 
   private CookieContainerRequest removeTaskRequest(Object task) {
@@ -1058,16 +1123,14 @@ public class TaskScheduler extends AbstractService
     Iterator<Container> containerIterator = containers.iterator();
     while (containerIterator.hasNext()) {
       Container container = containerIterator.next();
-      CookieContainerRequest assigned =
-        assigner.assignReUsedContainer(container, honorLocality);
-      if (assigned != null) {
-        assignedContainers.put(assigned, container);
+      if (assignReUsedContainerWithLocation(container, assigner,
+          assignedContainers, honorLocality)) {
         containerIterator.remove();
       }
     }
   }
 
-  private synchronized void assignReUsedContainerWithLocation(
+  private synchronized boolean assignReUsedContainerWithLocation(
     Container container,
     ContainerAssigner assigner,
     Map<CookieContainerRequest, Container> assignedContainers,
@@ -1077,7 +1140,9 @@ public class TaskScheduler extends AbstractService
       assigner.assignReUsedContainer(container, honorLocality);
     if (assigned != null) {
       assignedContainers.put(assigned, container);
+      return true;
     }
+    return false;
   }
 
   private void releaseUnassignedContainers(Iterable<Container> containers) {
@@ -1137,6 +1202,7 @@ public class TaskScheduler extends AbstractService
         + ", honorLocalityFlags=" + honorLocalityFlags
         + ", reusedContainer="
         + containerAssignments.containsKey(container.getId())
+        + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
         + ", containerResourceMemory=" + container.getResource().getMemory()
         + ", containerResourceVCores="
         + container.getResource().getVirtualCores());
@@ -1191,6 +1257,8 @@ public class TaskScheduler extends AbstractService
     @Override
     public CookieContainerRequest assignReUsedContainer(
       Container container, boolean honorLocality) {
+      // TEZ-586 this is not match an actual rackLocal request unless honorLocality
+      // is false. This method is useless if honorLocality=true
       if (!honorLocality) {
         String location = RackResolver.resolve(container.getNodeId().getHost())
           .getNetworkLocation();
@@ -1254,6 +1322,11 @@ public class TaskScheduler extends AbstractService
 
     private volatile boolean tryAssigningAll = false;
     private volatile boolean running = true;
+    private long maxScheduleTimeSeen = -1;
+    
+    // used for testing only
+    @VisibleForTesting
+    AtomicBoolean drainedDelayedContainers = null;
 
     DelayedContainerManager() {
       super.setName("DelayedContainerManager");
@@ -1273,6 +1346,12 @@ public class TaskScheduler extends AbstractService
         // locality at this point.
         if (delayedContainers.peek() == null) {
           try {
+            if (drainedDelayedContainers != null) {
+              drainedDelayedContainers.set(true);
+              synchronized (drainedDelayedContainers) {
+                drainedDelayedContainers.notifyAll();
+              }
+            }
             synchronized(this) {
               this.wait();
             }
@@ -1290,6 +1369,8 @@ public class TaskScheduler extends AbstractService
           long nextScheduleTs = heldContainer.getNextScheduleTime();
           if (currentTs >= nextScheduleTs) {
             // Remove the container and try scheduling it.
+            // TEZ-587 what if container is released by RM after this
+            // in onContainerCompleted()
             heldContainer = delayedContainers.poll();
             if (heldContainer == null) {
               continue;
@@ -1353,8 +1434,8 @@ public class TaskScheduler extends AbstractService
      * Indicate that an attempt should be made to allocate all available containers.
      * Intended to be used in cases where new Container requests come in 
      */
-    public void tryAssigningAll() {
-      this.tryAssigningAll = true;
+    public void triggerScheduling(boolean scheduleAll) {
+      this.tryAssigningAll = scheduleAll;
       synchronized(this) {
         this.notify();
       }
@@ -1384,6 +1465,9 @@ public class TaskScheduler extends AbstractService
       } else {
         delayedContainer.setNextScheduleTime(nextScheduleTime);
       }
+      if (maxScheduleTimeSeen < nextScheduleTime) {
+        maxScheduleTimeSeen = nextScheduleTime;
+      }
       if (LOG.isDebugEnabled()) {
         LOG.debug("Adding container to delayed queue"
           + ", containerId=" + delayedContainer.getContainer().getId()
@@ -1461,6 +1545,10 @@ public class TaskScheduler extends AbstractService
       this.containerExpiryTime = containerExpiryTime;
     }
     
+    boolean isNew() {
+      return containerSignature == null;
+    }
+    
     public Container getContainer() {
       return this.container;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/84c6c0bc/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 5a5502d..d14c74f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
@@ -84,7 +85,15 @@ import com.google.common.collect.Lists;
 
 public class TestContainerReuse {
 
-
+  void waitForDelayedDrainNotify(AtomicBoolean drainNotifier)
+      throws InterruptedException {
+    while (!drainNotifier.get()) {
+      synchronized (drainNotifier) {
+        drainNotifier.wait();
+      }
+    }
+  }
+  
   @Test(timeout = 15000l)
   public void testDelayedReuseContainerBecomesAvailable()
       throws IOException, InterruptedException, ExecutionException {
@@ -142,6 +151,9 @@ public class TestContainerReuse {
     TaskSchedulerAppCallbackDrainable drainableAppCallback =
       taskScheduler.getDrainableAppCallback();
 
+    AtomicBoolean drainNotifier = new AtomicBoolean(false);
+    taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
+
     Resource resource = Resource.newInstance(1024, 1);
     Priority priority = Priority.newInstance(5);
     String [] host1 = {"host1"};
@@ -175,8 +187,10 @@ public class TestContainerReuse {
     Container containerHost1 = createContainer(1, host1[0], resource, priority);
     Container containerHost2 = createContainer(2, host2[0], resource, priority);
 
+    drainNotifier.set(false);
     taskScheduler.onContainersAllocated(
       Lists.newArrayList(containerHost1, containerHost2));
+    waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler).taskAllocated(
       eq(ta11), any(Object.class), eq(containerHost1));
@@ -272,6 +286,9 @@ public class TestContainerReuse {
           .getSpyTaskScheduler();
     TaskSchedulerAppCallbackDrainable drainableAppCallback =
       taskScheduler.getDrainableAppCallback();
+    
+    AtomicBoolean drainNotifier = new AtomicBoolean(false);
+    taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
 
     Resource resource = Resource.newInstance(1024, 1);
     Priority priority = Priority.newInstance(5);
@@ -300,7 +317,9 @@ public class TestContainerReuse {
     Container containerHost1 = createContainer(1, host1[0], resource, priority);
     Container containerHost2 = createContainer(2, host2[0], resource, priority);
 
+    drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Lists.newArrayList(containerHost1, containerHost2));
+    waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(containerHost1));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta21), any(Object.class), eq(containerHost2));
@@ -363,6 +382,8 @@ public class TestContainerReuse {
     TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback)
((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
         .getSpyTaskScheduler();
     TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+    AtomicBoolean drainNotifier = new AtomicBoolean(false);
+    taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
 
     Resource resource1 = Resource.newInstance(1024, 1);
     String[] host1 = {"host1"};
@@ -401,7 +422,9 @@ public class TestContainerReuse {
     Container container1 = createContainer(1, "host1", resource1, priority1);
 
     // One container allocated.
+    drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
+    waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(container1));
 
@@ -436,7 +459,9 @@ public class TestContainerReuse {
     Container container2 = createContainer(2, "host2", resource1, priority1);
 
     // Second container allocated. Should be allocated to the last task.
+    drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container2));
+    waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta14), any(Object.class), eq(container2));
 
@@ -504,6 +529,8 @@ public class TestContainerReuse {
         .getSpyTaskScheduler();
     TaskSchedulerAppCallbackDrainable drainableAppCallback =
       taskScheduler.getDrainableAppCallback();
+    AtomicBoolean drainNotifier = new AtomicBoolean(false);
+    taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
 
     Resource resource1 = Resource.newInstance(1024, 1);
     String [] emptyHosts = new String[0];
@@ -533,7 +560,9 @@ public class TestContainerReuse {
     Container container1 = createContainer(1, "randomHost", resource1, priority);
 
     // One container allocated.
+    drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
+    waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler).taskAllocated(
       eq(ta11), any(Object.class), eq(container1));
@@ -622,6 +651,9 @@ public class TestContainerReuse {
           .getSpyTaskScheduler();
     TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
 
+    AtomicBoolean drainNotifier = new AtomicBoolean(false);
+    taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
+
     Resource resource1 = Resource.newInstance(1024, 1);
     String[] host1 = {"host1"};
 
@@ -654,7 +686,9 @@ public class TestContainerReuse {
     Container container1 = createContainer(1, host1[0], resource1, priority1);
 
     // One container allocated.
+    drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
+    waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler).taskAllocated(
       eq(ta11), any(Object.class), eq(container1));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/84c6c0bc/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 26b62d0..75c1026 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -392,6 +392,7 @@ public class TestTaskScheduler {
         .getDrainableAppCallback();
 
     Configuration conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
     scheduler.init(conf);
 
     RegisterApplicationMasterResponse mockRegResponse =
@@ -577,7 +578,9 @@ public class TestTaskScheduler {
     TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler
         .getDrainableAppCallback();
     
-    taskScheduler.init(new Configuration());
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
+    taskScheduler.init(conf);
     
     RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class);
     Resource mockMaxResource = mock(Resource.class);
@@ -590,9 +593,6 @@ public class TestTaskScheduler {
     
     taskScheduler.start();
     
-    taskScheduler.serviceInit(new Configuration());
-    taskScheduler.serviceStart();
-
     Resource resource = Resource.newInstance(1024, 1);
     Priority priority = Priority.newInstance(1);
 


Mime
View raw message