tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [1/2] git commit: TEZ-1269. TaskScheduler prematurely releases containers (bikas)
Date Thu, 17 Jul 2014 20:55:31 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 36d08fe03 -> e8dc9f72f


TEZ-1269. TaskScheduler prematurely releases containers (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/061dfe77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/061dfe77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/061dfe77

Branch: refs/heads/master
Commit: 061dfe77f9f7c8402f0df7cdabcbe2cd858f4bdf
Parents: 36d08fe
Author: Bikas Saha <bikas@apache.org>
Authored: Thu Jul 17 13:54:18 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Thu Jul 17 13:54:18 2014 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/api/TezConfiguration.java    |  35 ++-
 .../dag/app/rm/YarnTaskSchedulerService.java    | 243 +++++++++++++++----
 .../tez/dag/app/rm/TestContainerReuse.java      |  25 +-
 .../tez/dag/app/rm/TestTaskScheduler.java       | 216 ++++++++++++++++-
 .../examples/BroadcastAndOneToOneExample.java   |   2 +-
 5 files changed, 461 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/061dfe77/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 86d4e58..d3a0c6d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -275,14 +275,37 @@ public class TezConfiguration extends Configuration {
     TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS_DEFAULT = 1000l;
 
   /**
-   * The amount of time to hold on to a container if no task can be assigned to
-   * it immediately. Only active when reuse is enabled. Set to -1 to never
-   * release a container in a session.
+   * The minimum amount of time to hold on to a container if no task can be
+   * assigned to it immediately. Only active when reuse is enabled. Set to -1 to
+   * never release a container in a session.
    */
-  public static final String TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS =
-    TEZ_AM_PREFIX + "container.session.delay-allocation-millis";
+  public static final String TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS = 
+    TEZ_AM_PREFIX + "container.idle.release-timeout-min.millis";
   public static final long
-    TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS_DEFAULT = 10000l;
+    TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS_DEFAULT = 5000l;  
+
+  /**
+   * The maximum amount of time to hold on to a container if no task can be
+   * assigned to it immediately. Only active when reuse is enabled. The value
+   * must be +ve and >=
+   * TezConfiguration#TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS.
+   * Containers will have an expire time set to a random value between
+   * TezConfiguration#TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS &&
+   * TezConfiguration#TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS. This 
+   * creates a graceful reduction in the amount of idle resources held
+   */
+  public static final String TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS = 
+      TEZ_AM_PREFIX + "container.idle.release-timeout-max.millis";
+  public static final long
+    TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS_DEFAULT = 10000l;
+  
+  /**
+   * The minimum number of containers that will be held by the session
+   */
+  public static final String TEZ_AM_SESSION_MIN_HELD_CONTAINERS = 
+      TEZ_AM_PREFIX + "session.min.held-containers";
+  public static final int TEZ_AM_SESSION_MIN_HELD_CONTAINERS_DEFAULT = 0;
+
 
   public static final String TEZ_PB_BINARY_CONF_NAME = "tez-conf.pb";
   public static final String TEZ_PB_PLAN_BINARY_NAME = "tez-dag.pb";

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/061dfe77/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index f585c39..78de101 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -35,9 +35,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math3.random.RandomDataGenerator;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -64,6 +66,7 @@ import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -134,7 +137,12 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
 
   DelayedContainerManager delayedContainerManager;
   long localitySchedulingDelay;
-  long sessionDelay;
+  long idleContainerTimeoutMin;
+  long idleContainerTimeoutMax = 0;
+  int sessionNumMinHeldContainers = 0;
+  Set<ContainerId> sessionMinHeldContainers = Sets.newHashSet();
+  
+  RandomDataGenerator random = new RandomDataGenerator();
 
   @VisibleForTesting
   protected AtomicBoolean shouldUnregister = new AtomicBoolean(false);
@@ -296,11 +304,24 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     Preconditions.checkArgument(localitySchedulingDelay >= 0,
         "Locality Scheduling delay should be >=0");
 
-    sessionDelay = conf.getLong(
-        TezConfiguration.TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS,
-        TezConfiguration.TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS_DEFAULT);
-    Preconditions.checkArgument(sessionDelay >= 0 || sessionDelay == -1,
-      "Session delay should be either -1 or >=0");
+    idleContainerTimeoutMin = conf.getLong(
+        TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS,
+        TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS_DEFAULT);
+    Preconditions.checkArgument(idleContainerTimeoutMin >= 0 || idleContainerTimeoutMin
== -1,
+      "Idle container release min timeout should be either -1 or >=0");
+    
+    idleContainerTimeoutMax = conf.getLong(
+        TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS,
+        TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS_DEFAULT);
+    Preconditions.checkArgument(
+        idleContainerTimeoutMax >= 0 && idleContainerTimeoutMax >= idleContainerTimeoutMin,
+        "Idle container release max timeout should be >=0 and >= " + 
+        TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS);
+    
+    sessionNumMinHeldContainers = conf.getInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS,

+        TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS_DEFAULT);
+    Preconditions.checkArgument(sessionNumMinHeldContainers >= 0, 
+        "Session minimum held containers should be >=0");
 
     delayedContainerManager = new DelayedContainerManager();
     LOG.info("TaskScheduler initialized with configuration: " +
@@ -309,7 +330,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
             ", reuseRackLocal: " + reuseRackLocal +
             ", reuseNonLocal: " + reuseNonLocal + 
             ", localitySchedulingDelay: " + localitySchedulingDelay +
-            ", sessionDelay=" + sessionDelay);
+            ", idleContainerMinTimeout=" + idleContainerTimeoutMin +
+            ", idleContainerMaxTimeout=" + idleContainerTimeoutMax +
+            ", sessionMinHeldContainers=" + sessionNumMinHeldContainers);
   }
 
   @Override
@@ -509,6 +532,17 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
 
     return assignedContainers;
   }
+  
+  @VisibleForTesting
+  long getHeldContainerExpireTime(long startTime) {
+    long expireTime = (startTime + idleContainerTimeoutMin);
+    if (idleContainerTimeoutMin != -1 && idleContainerTimeoutMin < idleContainerTimeoutMax)
{
+      long expireTimeMax = startTime + idleContainerTimeoutMax;
+      expireTime = random.nextLong(expireTime, expireTimeMax);
+    }
+    
+    return expireTime;
+  }
 
   /**
    * Try to assign a re-used container
@@ -539,35 +573,64 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       // if sessionDelay defined, push back into delayed queue if not already
       // done so
 
+      // Compute min held containers.
+      if (appContext.isSession() && sessionNumMinHeldContainers > 0 &&
+          sessionMinHeldContainers.isEmpty()) {
+        // session mode and need to hold onto containers and not done so already
+        determineMinHeldContainers();
+      }
+      
       heldContainer.resetLocalityMatchLevel();
       long currentTime = System.currentTimeMillis();
+      boolean releaseContainer = false;
+
       if (isNew || (heldContainer.getContainerExpiryTime() <= currentTime
-          && sessionDelay != -1)) {
-        LOG.info("No taskRequests. Container's session delay expired or is new. " +
-        	"Releasing container"
-          + ", containerId=" + heldContainer.container.getId()
-          + ", containerExpiryTime="
-          + heldContainer.getContainerExpiryTime()
-          + ", sessionDelay=" + sessionDelay
-          + ", taskRequestsCount=" + taskRequests.size()
-          + ", heldContainers=" + heldContainers.size()
-          + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
-          + ", isNew=" + isNew);
-        releaseUnassignedContainers(
-            Lists.newArrayList(heldContainer.container));
-      } else {
-        if (!appContext.isSession()) {
-          releaseUnassignedContainers(
-            Lists.newArrayList(heldContainer.container));
+          && idleContainerTimeoutMin != -1)) {
+        // container idle timeout has expired or is a new unused container. 
+        // new container is possibly a spurious race condition allocation.
+        if (!isNew && appContext.isSession() && 
+            sessionMinHeldContainers.contains(heldContainer.getContainer().getId())) {
+          // Not a potentially spurious new container.
+          // In session mode and container in set of chosen min held containers
+          // increase the idle container expire time to maintain sanity with 
+          // the rest of the code
+          heldContainer.setContainerExpiryTime(getHeldContainerExpireTime(currentTime));
         } else {
-          // only put back in queue if this is a session
-          heldContainer.resetLocalityMatchLevel();
-          delayedContainerManager.addDelayedContainer(
-            heldContainer.getContainer(),
-            currentTime + localitySchedulingDelay);
+          releaseContainer = true;
         }
       }
-    } else if (state.equals(DAGAppMasterState.RUNNING)) {
+      
+      if (releaseContainer) {
+        LOG.info("No taskRequests. Container's idle timeout delay expired or is new. " +
+            "Releasing container"
+            + ", containerId=" + heldContainer.container.getId()
+            + ", containerExpiryTime="
+            + heldContainer.getContainerExpiryTime()
+            + ", idleTimeout=" + idleContainerTimeoutMin
+            + ", taskRequestsCount=" + taskRequests.size()
+            + ", heldContainers=" + heldContainers.size()
+            + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
+            + ", isNew=" + isNew);
+          releaseUnassignedContainers(
+              Lists.newArrayList(heldContainer.container));        
+      } else {
+        // no outstanding work and container idle timeout not expired
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Holding onto idle container with no work. CId: "
+              + heldContainer.getContainer().getId() + " with expiry: "
+              + heldContainer.getContainerExpiryTime() + " currentTime: "
+              + currentTime + " next look: "
+              + (currentTime + localitySchedulingDelay));
+        }
+        // put back and wait for new requests until expiry
+        heldContainer.resetLocalityMatchLevel();
+        delayedContainerManager.addDelayedContainer(
+            heldContainer.getContainer(), currentTime
+                + localitySchedulingDelay);        
+      }
+   } else if (state.equals(DAGAppMasterState.RUNNING)) {
+      // clear min held containers since we need to allocate to tasks
+      sessionMinHeldContainers.clear();
       HeldContainer.LocalityMatchLevel localityMatchLevel =
         heldContainer.getLocalityMatchLevel();
       Map<CookieContainerRequest, Container> assignedContainers =
@@ -633,13 +696,20 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
 
         // Release container if final expiry time is reached
         // Dont release a new container. The RM may not give us new ones
+        // The assumption is that the expire time is larger than the sum of all
+        // locality delays. So if we hit the expire time then we have already 
+        // tried to assign at all locality levels.
+        // We run the risk of not being able to retain min held containers but 
+        // if we are not being able to assign containers to pending tasks then 
+        // we cannot avoid releasing containers. Or else we may not be able to 
+        // get new containers from YARN to match the pending request
         if (!isNew && heldContainer.getContainerExpiryTime() <= currentTime
-          && sessionDelay != -1) {
-          LOG.info("Container's session delay expired. Releasing container"
+          && idleContainerTimeoutMin != -1) {
+          LOG.info("Container's idle timeout expired. Releasing container"
             + ", containerId=" + heldContainer.container.getId()
             + ", containerExpiryTime="
             + heldContainer.getContainerExpiryTime()
-            + ", sessionDelay=" + sessionDelay);
+            + ", idleTimeoutMin=" + idleContainerTimeoutMin);
           releaseUnassignedContainers(
             Lists.newArrayList(heldContainer.container));
         } else {
@@ -688,7 +758,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
               LOG.info("Releasing held container as either there are pending but "
                 + " unmatched requests or this is not a session"
                 + ", containerId=" + heldContainer.container.getId()
-                + ", pendingTasks=" + !taskRequests.isEmpty()
+                + ", pendingTasks=" + taskRequests.size()
                 + ", isSession=" + appContext.isSession()
                 + ". isNew=" + isNew);
               releaseUnassignedContainers(
@@ -910,8 +980,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
           if (heldContainer != null) {
             heldContainer.resetLocalityMatchLevel();
             long currentTime = System.currentTimeMillis();
-            if (sessionDelay > 0) {
-              heldContainer.setContainerExpiryTime(currentTime + sessionDelay);
+            if (idleContainerTimeoutMin > 0) {
+              heldContainer.setContainerExpiryTime(getHeldContainerExpireTime(currentTime));
             }
             assignedContainers = assignDelayedContainer(heldContainer);
           } else {
@@ -1265,9 +1335,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   
   private void pushNewContainerToDelayed(List<Container> containers){
     long expireTime = -1;
-    if (sessionDelay > 0) {
+    if (idleContainerTimeoutMin > 0) {
       long currentTime = System.currentTimeMillis();
-      expireTime = currentTime + sessionDelay;
+      expireTime = currentTime + idleContainerTimeoutMin;
     }
 
     synchronized (delayedContainerManager) {
@@ -1384,14 +1454,15 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       return false;
     }
     
-    if (topPendingTaskPriority.compareTo(containerPriority) > 0) {
+    if (topPendingTaskPriority.compareTo(containerPriority) > 0 && 
+        heldContainers.get(container.getId()).isNew()) {
       // if the next task to assign is higher priority than the container then 
       // dont assign this container to that task.
       // if task and container are equal priority - then its first use or reuse
       // within the same priority - safe to use
-      // if task is lower priority than container then its we use a container that
+      // if task is lower priority than container then if we use a container that
       // is no longer needed by higher priority tasks All those higher pri tasks 
-      // have been assigned resources - safe to use (first use or reuse)
+      // has been assigned resources - safe to use (first use or reuse)
       // if task is higher priority than container then we may end up using a 
       // container that was assigned by the RM for a lower priority pending task 
       // that will be assigned after this higher priority task is assigned. If we
@@ -1399,6 +1470,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       // container to that task later on. However the RM has already assigned us 
       // all containers and is not going to give us new containers. We will get 
       // stuck for resources.
+      // the above applies for new containers. If a container has already been 
+      // re-used then this is not relevant
       return false;
     }
     
@@ -1550,8 +1623,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       // TEZ-586 this is not match an actual rackLocal request unless honorLocality
       // is false. This method is useless if honorLocality=true
       if (!honorLocality) {
-        String location = RackResolver.resolve(container.getNodeId().getHost())
-          .getNetworkLocation();
+        String location = heldContainers.get(container.getId()).getRack();
         CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
             container, location, false);
         doBookKeepingForAssignedContainer(assigned, container, location,
@@ -1804,6 +1876,80 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     }
 
   }
+  
+  synchronized void determineMinHeldContainers() {
+    sessionMinHeldContainers.clear();
+    if (sessionNumMinHeldContainers <= 0) {
+      return;
+    }
+    
+    if (heldContainers.size() <= sessionNumMinHeldContainers) {
+      sessionMinHeldContainers.addAll(heldContainers.keySet());
+    }
+    
+    Map<String, AtomicInteger> rackHeldNumber = Maps.newHashMap();
+    Map<String, List<HeldContainer>> nodeHeldContainers = Maps.newHashMap();
+    for(HeldContainer heldContainer : heldContainers.values()) {
+      AtomicInteger count = rackHeldNumber.get(heldContainer.getRack());
+      if (count == null) {
+        count = new AtomicInteger(0);
+        rackHeldNumber.put(heldContainer.getRack(), count);
+      }
+      count.incrementAndGet();
+      List<HeldContainer> nodeContainers = nodeHeldContainers.get(heldContainer.getNode());
+      if (nodeContainers == null) {
+        nodeContainers = Lists.newLinkedList();
+        nodeHeldContainers.put(heldContainer.getNode(), nodeContainers);
+      }
+      nodeContainers.add(heldContainer);
+    }
+    Map<String, AtomicInteger> rackToHoldNumber = Maps.newHashMap();
+    for (String rack : rackHeldNumber.keySet()) {
+      rackToHoldNumber.put(rack, new AtomicInteger(0));
+    }
+    
+    // distribute evenly across nodes
+    // the loop assigns 1 container per rack over all racks
+    int containerCount = 0;
+    while (containerCount < sessionNumMinHeldContainers && !rackHeldNumber.isEmpty())
{
+      Iterator<Entry<String, AtomicInteger>> iter = rackHeldNumber.entrySet().iterator();
+      while (containerCount < sessionNumMinHeldContainers && iter.hasNext()) {
+        Entry<String, AtomicInteger> entry = iter.next();
+        if (entry.getValue().decrementAndGet() >=0) {
+          containerCount++;
+          rackToHoldNumber.get(entry.getKey()).incrementAndGet();
+        } else {
+          iter.remove();
+        }
+      }
+    }
+    
+    // distribute containers evenly across nodes while not exceeding rack limit
+    // the loop assigns 1 container per node over all nodes
+    containerCount = 0;
+    while (containerCount < sessionNumMinHeldContainers && !nodeHeldContainers.isEmpty())
{
+      Iterator<Entry<String, List<HeldContainer>>> iter = nodeHeldContainers.entrySet().iterator();
+      while (containerCount < sessionNumMinHeldContainers && iter.hasNext()) {
+        List<HeldContainer> nodeContainers = iter.next().getValue();
+        if (nodeContainers.isEmpty()) {
+          // node is empty. remove it.
+          iter.remove();
+          continue;
+        }
+        HeldContainer heldContainer = nodeContainers.remove(nodeContainers.size() - 1);
+        if (rackToHoldNumber.get(heldContainer.getRack()).decrementAndGet() >= 0) {
+          // rack can hold a container
+          containerCount++;
+          sessionMinHeldContainers.add(heldContainer.getContainer().getId());
+        } else {
+          // rack limit reached. remove node.
+          iter.remove();
+        }
+      }
+    }
+    
+    LOG.info("Holding on to " + sessionMinHeldContainers.size() + " containers");
+  }
 
   private class ContainerIterable implements Iterable<Container> {
 
@@ -1849,6 +1995,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     }
 
     Container container;
+    private String rack;
     private long nextScheduleTime;
     private Object firstContainerSignature;
     private LocalityMatchLevel localityMatchLevel;
@@ -1868,12 +2015,22 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       }
       this.localityMatchLevel = LocalityMatchLevel.NODE;
       this.containerExpiryTime = containerExpiryTime;
+      this.rack = RackResolver.resolve(container.getNodeId().getHost())
+          .getNetworkLocation();
     }
     
     boolean isNew() {
       return firstContainerSignature == null;
     }
     
+    String getRack() {
+      return this.rack;
+    }
+    
+    String getNode() {
+      return this.container.getNodeId().getHost();
+    }
+    
     int geNumAssignmentAttempts() {
       return numAssignmentAttempts;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/061dfe77/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index c5d20d7..2d3e7d8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -112,6 +112,7 @@ public class TestContainerReuse {
       TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false);
     conf.setLong(
       TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 3000l);
+    conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
     RackResolver.init(conf);
     TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
 
@@ -249,6 +250,7 @@ public class TestContainerReuse {
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, false);
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false);
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS,
1000l);
+    conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
     RackResolver.init(conf);
     TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
 
@@ -355,7 +357,7 @@ public class TestContainerReuse {
     tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
     tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true);
     tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS,
0);
-    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS, 0);
+    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
     RackResolver.init(tezConf);
     TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
 
@@ -490,7 +492,7 @@ public class TestContainerReuse {
     tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
     tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true);
     tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS,
0);
-    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS, 0);
+    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
     //Profile 3 tasks
     tezConf.set(TezConfiguration.TEZ_PROFILE_TASK_LIST, "v1[1,3,4]");
     tezConf.set(TezConfiguration.TEZ_PROFILE_JVM_OPTS, "dir=/tmp/__VERTEX_NAME__/__TASK_INDEX__");
@@ -679,7 +681,8 @@ public class TestContainerReuse {
     tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true);
     tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED,
true);
     tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS,
100l);
-    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS, 10000l);
+    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 1000l);
+    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 1000l);
     RackResolver.init(tezConf);
     TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
 
@@ -779,7 +782,7 @@ public class TestContainerReuse {
     eventHandler.reset();
 
     LOG.info("Sleeping to ensure that the scheduling loop runs");
-    Thread.sleep(6000l);
+    Thread.sleep(3000l);
     verify(taskSchedulerEventHandler).taskAllocated(
       eq(ta12), any(Object.class), eq(container1));
 
@@ -788,6 +791,8 @@ public class TestContainerReuse {
       new AMSchedulerEventTAEnded(ta12, container1.getId(),
         TaskAttemptState.SUCCEEDED));
     drainableAppCallback.drain();
+    LOG.info("Sleeping to ensure that the scheduling loop runs");
+    Thread.sleep(3000l);
     verify(rmClient).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
 
@@ -803,7 +808,9 @@ public class TestContainerReuse {
     tezConf.setLong(
       TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 1l);
     tezConf.setLong(
-      TezConfiguration.TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS, 2000l);
+      TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 2000l);
+    tezConf.setInt(
+        TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, 1);
     RackResolver.init(tezConf);
     TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
 
@@ -910,9 +917,9 @@ public class TestContainerReuse {
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
 
     LOG.info("Sleeping to ensure that the scheduling loop runs");
-    Thread.sleep(6000l);
-    verify(rmClient).releaseAssignedContainer(eq(container1.getId()));
-    eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
+    Thread.sleep(3000l);
+    // container should not get released due to min held containers
+    verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
 
     taskScheduler.close();
     taskSchedulerEventHandler.close();
@@ -925,7 +932,7 @@ public class TestContainerReuse {
     tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true);
     tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED,
true);
     tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS,
0);
-    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS, -1);
+    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, -1);
     RackResolver.init(tezConf);
     TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/061dfe77/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 2e94dc0..bd53f87 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -41,6 +41,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
@@ -79,6 +80,8 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import com.google.common.collect.Sets;
+
 
 public class TestTaskScheduler {
 
@@ -516,7 +519,7 @@ public class TestTaskScheduler {
     // to match all in the same pass
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS,
0);
     // to release immediately after deallocate
-    conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS, 0);
+    conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
     scheduler.init(conf);
     drainableAppCallback.drain();
 
@@ -986,6 +989,217 @@ public class TestTaskScheduler {
     verify(mockRMClient).stop();
     scheduler.close();
   }
+  
+  @SuppressWarnings("unchecked")
+  @Test(timeout=5000)
+  public void testTaskSchedulerDetermineMinHeldContainers() throws Exception {
+    RackResolver.init(new YarnConfiguration());
+    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
+    AppContext mockAppContext = mock(AppContext.class);
+    when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
+
+    TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
+                                                  mock(TezAMRMClientAsync.class);
+
+    String appHost = "host";
+    int appPort = 0;
+    String appUrl = "url";
+    TaskSchedulerWithDrainableAppCallback scheduler =
+      new TaskSchedulerWithDrainableAppCallback(
+        mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
+        appUrl, mockRMClient, mockAppContext);
+
+    Configuration conf = new Configuration();
+    scheduler.init(conf);
+    RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class);
+    Resource mockMaxResource = mock(Resource.class);
+    Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
+    when(mockRegResponse.getMaximumResourceCapability()).thenReturn(
+        mockMaxResource);
+    when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
+    when(
+        mockRMClient.registerApplicationMaster(anyString(), anyInt(),
+            anyString())).thenReturn(mockRegResponse);
+    Resource mockClusterResource = mock(Resource.class);
+    when(mockRMClient.getAvailableResources()).thenReturn(mockClusterResource);
+
+    scheduler.start();
+    
+    String rack1 = "r1";
+    String rack2 = "r2";
+    String rack3 = "r3";
+    String node1Rack1 = "n1r1";
+    String node2Rack1 = "n2r1";
+    String node1Rack2 = "n1r2";
+    String node2Rack2 = "n2r2";
+    String node1Rack3 = "n1r3";
+    ApplicationAttemptId appId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0,
0), 0);
+
+    ContainerId mockCId1 = ContainerId.newInstance(appId, 0);
+    HeldContainer hc1 = mock(HeldContainer.class, RETURNS_DEEP_STUBS);
+    when(hc1.getNode()).thenReturn(node1Rack1);
+    when(hc1.getRack()).thenReturn(rack1);
+    when(hc1.getContainer().getId()).thenReturn(mockCId1);
+    ContainerId mockCId2 = ContainerId.newInstance(appId, 1);
+    HeldContainer hc2 = mock(HeldContainer.class, RETURNS_DEEP_STUBS);
+    when(hc2.getNode()).thenReturn(node2Rack1);
+    when(hc2.getRack()).thenReturn(rack1);
+    when(hc2.getContainer().getId()).thenReturn(mockCId2);
+    ContainerId mockCId3 = ContainerId.newInstance(appId, 2);
+    HeldContainer hc3 = mock(HeldContainer.class, RETURNS_DEEP_STUBS);
+    when(hc3.getNode()).thenReturn(node1Rack1);
+    when(hc3.getRack()).thenReturn(rack1);
+    when(hc3.getContainer().getId()).thenReturn(mockCId3);
+    ContainerId mockCId4 = ContainerId.newInstance(appId, 3);
+    HeldContainer hc4 = mock(HeldContainer.class, RETURNS_DEEP_STUBS);
+    when(hc4.getNode()).thenReturn(node2Rack1);
+    when(hc4.getRack()).thenReturn(rack1);
+    when(hc4.getContainer().getId()).thenReturn(mockCId4);
+    ContainerId mockCId5 = ContainerId.newInstance(appId, 4);
+    HeldContainer hc5 = mock(HeldContainer.class, RETURNS_DEEP_STUBS);
+    when(hc5.getNode()).thenReturn(node1Rack2);
+    when(hc5.getRack()).thenReturn(rack2);
+    when(hc5.getContainer().getId()).thenReturn(mockCId5);
+    ContainerId mockCId6 = ContainerId.newInstance(appId, 5);
+    HeldContainer hc6 = mock(HeldContainer.class, RETURNS_DEEP_STUBS);
+    when(hc6.getNode()).thenReturn(node2Rack2);
+    when(hc6.getRack()).thenReturn(rack2);
+    when(hc6.getContainer().getId()).thenReturn(mockCId6);
+    ContainerId mockCId7 = ContainerId.newInstance(appId, 6);
+    HeldContainer hc7 = mock(HeldContainer.class, RETURNS_DEEP_STUBS);
+    when(hc7.getNode()).thenReturn(node1Rack3);
+    when(hc7.getRack()).thenReturn(rack3);
+    when(hc7.getContainer().getId()).thenReturn(mockCId7);
+    
+    scheduler.heldContainers.put(mockCId1, hc1);
+    scheduler.heldContainers.put(mockCId2, hc2);
+    scheduler.heldContainers.put(mockCId3, hc3);
+    scheduler.heldContainers.put(mockCId4, hc4);
+    scheduler.heldContainers.put(mockCId5, hc5);
+    scheduler.heldContainers.put(mockCId6, hc6);
+    scheduler.heldContainers.put(mockCId7, hc7);
+    
+    // test empty case
+    scheduler.sessionNumMinHeldContainers = 0;
+    scheduler.determineMinHeldContainers();
+    Assert.assertEquals(0, scheduler.sessionMinHeldContainers.size());
+    
+    // test min >= held
+    scheduler.sessionNumMinHeldContainers = 7;
+    scheduler.determineMinHeldContainers();
+    Assert.assertEquals(7, scheduler.sessionMinHeldContainers.size());
+    
+    // test min < held
+    scheduler.sessionNumMinHeldContainers = 5;
+    scheduler.determineMinHeldContainers();
+    Assert.assertEquals(5, scheduler.sessionMinHeldContainers.size());
+    
+    Set<HeldContainer> heldContainers = Sets.newHashSet();
+    for (ContainerId cId : scheduler.sessionMinHeldContainers) {
+      heldContainers.add(scheduler.heldContainers.get(cId));
+    }
+    Set<String> racks = Sets.newHashSet();
+    Set<String> nodes = Sets.newHashSet();
+    for (HeldContainer hc : heldContainers) {
+      nodes.add(hc.getNode());
+      racks.add(hc.getRack());
+    }
+    // 1 container from each node in rack1 and rack2. 1 container from rack3.
+    // covers not enough containers in rack (rack 3)
+    // covers just enough containers in rack (rack 2)
+    // covers more than enough containers in rack (rack 1)
+    Assert.assertEquals(5, nodes.size());
+    Assert.assertTrue(nodes.contains(node1Rack1) && nodes.contains(node2Rack1) &&
+        nodes.contains(node1Rack2) && nodes.contains(node2Rack2) &&
+        nodes.contains(node1Rack3));
+    Assert.assertEquals(3, racks.size());
+    Assert.assertTrue(racks.contains(rack1) && racks.contains(rack2) &&
+        racks.contains(rack3));
+    
+    String appMsg = "success";
+    AppFinalStatus finalStatus =
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+    when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
+    scheduler.stop();
+    scheduler.close();
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Test(timeout=5000)
+  public void testTaskSchedulerRandomReuseExpireTime() throws Exception {
+    RackResolver.init(new YarnConfiguration());
+    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
+    AppContext mockAppContext = mock(AppContext.class);
+    when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
+
+    TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
+                                                  mock(TezAMRMClientAsync.class);
+
+    String appHost = "host";
+    int appPort = 0;
+    String appUrl = "url";
+    TaskSchedulerWithDrainableAppCallback scheduler1 =
+      new TaskSchedulerWithDrainableAppCallback(
+        mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
+        appUrl, mockRMClient, mockAppContext);
+    TaskSchedulerWithDrainableAppCallback scheduler2 =
+        new TaskSchedulerWithDrainableAppCallback(
+          mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
+          appUrl, mockRMClient, mockAppContext);
+
+    long minTime = 1000l;
+    long maxTime = 100000l;
+    Configuration conf1 = new Configuration();
+    conf1.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, minTime);
+    conf1.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, minTime);
+    scheduler1.init(conf1);
+    Configuration conf2 = new Configuration();
+    conf2.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, minTime);
+    conf2.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, maxTime);
+    scheduler2.init(conf2);
+
+    RegisterApplicationMasterResponse mockRegResponse =
+                                mock(RegisterApplicationMasterResponse.class);
+    Resource mockMaxResource = mock(Resource.class);
+    Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
+    when(mockRegResponse.getMaximumResourceCapability()).
+                                                   thenReturn(mockMaxResource);
+    when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
+    when(mockRMClient.
+          registerApplicationMaster(anyString(), anyInt(), anyString())).
+                                                   thenReturn(mockRegResponse);
+    Resource mockClusterResource = mock(Resource.class);
+    when(mockRMClient.getAvailableResources()).
+                                              thenReturn(mockClusterResource);
+
+    scheduler1.start();
+    scheduler2.start();
+    
+    // when min == max the expire time is always min
+    for (int i=0; i<10; ++i) {
+      Assert.assertEquals(minTime, scheduler1.getHeldContainerExpireTime(0));
+    }
+    
+    long lastExpireTime = 0;
+    // when min < max the expire time is random in between min and max
+    for (int i=0; i<10; ++i) {
+      long currExpireTime = scheduler2.getHeldContainerExpireTime(0);
+      Assert.assertTrue(
+          "min: " + minTime + " curr: " + currExpireTime + " max: " + maxTime,
+          (minTime <= currExpireTime && currExpireTime <= maxTime));
+      Assert.assertNotEquals(lastExpireTime, currExpireTime);
+      lastExpireTime = currExpireTime;
+    }
+
+    String appMsg = "success";
+    AppFinalStatus finalStatus =
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+    when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
+    scheduler1.stop();
+    scheduler1.close();
+    scheduler2.stop();
+    scheduler2.close();
+  }
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/061dfe77/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index e6e80c1..e476a88 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -199,7 +199,7 @@ public class BroadcastAndOneToOneExample extends Configured implements
Tool {
     // is the same filesystem as the one used for Input/Output.
     TezClient tezSession = null;
     // needs session or else TaskScheduler does not hold onto containers
-    tezSession = new TezClient("broadcastAndOneToOneExample", tezConf, true);
+    tezSession = new TezClient("broadcastAndOneToOneExample", tezConf);
     tezSession.start();
 
     DAGClient dagClient = null;


Mime
View raw message