hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sa...@apache.org
Subject svn commit: r1598197 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ hadoop-yarn/hadoop-yarn-server/hadoop-y...
Date Thu, 29 May 2014 04:01:24 GMT
Author: sandy
Date: Thu May 29 04:01:24 2014
New Revision: 1598197

URL: http://svn.apache.org/r1598197
Log:
YARN-596. Use scheduling policies throughout the queue hierarchy to decide which containers to preempt (Wei Yan via Sandy Ryza)

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu May 29 04:01:24 2014
@@ -114,6 +114,9 @@ Release 2.5.0 - UNRELEASED
     YARN-2107. Refactored timeline classes into o.a.h.y.s.timeline package. (Vinod
     Kumar Vavilapalli via zjshen)
 
+    YARN-596. Use scheduling policies throughout the queue hierarchy to decide
+    which containers to preempt (Wei Yan via Sandy Ryza)
+
   OPTIMIZATIONS
 
   BUG FIXES 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Thu May 29 04:01:24 2014
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,8 +33,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -58,6 +58,8 @@ public class AppSchedulable extends Sche
   private Priority priority;
   private ResourceWeights resourceWeights;
 
+  private RMContainerComparator comparator = new RMContainerComparator();
+
   public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) {
     this.scheduler = scheduler;
     this.app = app;
@@ -111,7 +113,10 @@ public class AppSchedulable extends Sche
 
   @Override
   public Resource getResourceUsage() {
-    return app.getCurrentConsumption();
+    // Here the getPreemptedResources() always return zero, except in
+    // a preemption round
+    return Resources.subtract(app.getCurrentConsumption(),
+        app.getPreemptedResources());
   }
 
 
@@ -384,6 +389,27 @@ public class AppSchedulable extends Sche
   }
   
   /**
+   * Preempt a running container according to the priority
+   */
+  @Override
+  public RMContainer preemptContainer() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("App " + getName() + " is going to preempt a running " +
+          "container");
+    }
+
+    RMContainer toBePreempted = null;
+    for (RMContainer container : app.getLiveContainers()) {
+      if (! app.getPreemptionContainers().contains(container) &&
+          (toBePreempted == null ||
+              comparator.compare(toBePreempted, container) > 0)) {
+        toBePreempted = container;
+      }
+    }
+    return toBePreempted;
+  }
+
+  /**
    * Whether this app has containers requests that could be satisfied on the
    * given node, if the node had full space.
    */
@@ -407,4 +433,17 @@ public class AppSchedulable extends Sche
         Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
             anyRequest.getCapability(), node.getRMNode().getTotalCapability());
   }
+
+  static class RMContainerComparator implements Comparator<RMContainer>,
+      Serializable {
+    @Override
+    public int compare(RMContainer c1, RMContainer c2) {
+      int ret = c1.getContainer().getPriority().compareTo(
+          c2.getContainer().getPriority());
+      if (ret == 0) {
+        return c2.getContainerId().compareTo(c1.getContainerId());
+      }
+      return ret;
+      }
+    }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Thu May 29 04:01:24 2014
@@ -33,10 +33,10 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 
 @Private
 @Unstable
@@ -209,6 +209,36 @@ public class FSLeafQueue extends FSQueue
   }
 
   @Override
+  public RMContainer preemptContainer() {
+    RMContainer toBePreempted = null;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Queue " + getName() + " is going to preempt a container " +
+          "from its applications.");
+    }
+
+    // If this queue is not over its fair share, reject
+    if (!preemptContainerPreCheck()) {
+      return toBePreempted;
+    }
+
+    // Choose the app that is most over fair share
+    Comparator<Schedulable> comparator = policy.getComparator();
+    AppSchedulable candidateSched = null;
+    for (AppSchedulable sched : runnableAppScheds) {
+      if (candidateSched == null ||
+          comparator.compare(sched, candidateSched) > 0) {
+        candidateSched = sched;
+      }
+    }
+
+    // Preempt from the selected app
+    if (candidateSched != null) {
+      toBePreempted = candidateSched.preemptContainer();
+    }
+    return toBePreempted;
+  }
+
+  @Override
   public List<FSQueue> getChildQueues() {
     return new ArrayList<FSQueue>(1);
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Thu May 29 04:01:24 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.re
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -32,6 +33,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 
@@ -157,6 +159,32 @@ public class FSParentQueue extends FSQue
   }
 
   @Override
+  public RMContainer preemptContainer() {
+    RMContainer toBePreempted = null;
+
+    // If this queue is not over its fair share, reject
+    if (!preemptContainerPreCheck()) {
+      return toBePreempted;
+    }
+
+    // Find the childQueue which is most over fair share
+    FSQueue candidateQueue = null;
+    Comparator<Schedulable> comparator = policy.getComparator();
+    for (FSQueue queue : childQueues) {
+      if (candidateQueue == null ||
+          comparator.compare(queue, candidateQueue) > 0) {
+        candidateQueue = queue;
+      }
+    }
+
+    // Let the selected queue choose which of its container to preempt
+    if (candidateQueue != null) {
+      toBePreempted = candidateQueue.preemptContainer();
+    }
+    return toBePreempted;
+  }
+
+  @Override
   public List<FSQueue> getChildQueues() {
     return childQueues;
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Thu May 29 04:01:24 2014
@@ -187,4 +187,17 @@ public abstract class FSQueue extends Sc
     }
     return true;
   }
+
+  /**
+   * Helper method to check if the queue should preempt containers
+   *
+   * @return true if check passes (can preempt) or false otherwise
+   */
+  protected boolean preemptContainerPreCheck() {
+    if (this == scheduler.getQueueManager().getRootQueue()) {
+      return true;
+    }
+    return parent.getPolicy()
+        .checkIfUsageOverFairShare(getResourceUsage(), getFairShare());
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Thu May 29 04:01:24 2014
@@ -59,6 +59,8 @@ public class FSSchedulerApp extends Sche
   private AppSchedulable appSchedulable;
 
   final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
+
+  private Resource preemptedResources = Resources.createResource(0);
   
   public FSSchedulerApp(ApplicationAttemptId applicationAttemptId, 
       String user, FSLeafQueue queue, ActiveUsersManager activeUsersManager,
@@ -316,6 +318,7 @@ public class FSSchedulerApp extends Sche
   public void addPreemption(RMContainer container, long time) {
     assert preemptionMap.get(container) == null;
     preemptionMap.put(container, time);
+    Resources.addTo(preemptedResources, container.getAllocatedResource());
   }
 
   public Long getContainerPreemptionTime(RMContainer container) {
@@ -330,4 +333,20 @@ public class FSSchedulerApp extends Sche
   public FSLeafQueue getQueue() {
     return (FSLeafQueue)super.getQueue();
   }
+
+  public Resource getPreemptedResources() {
+    return preemptedResources;
+  }
+
+  public void resetPreemptedResources() {
+    preemptedResources = Resources.createResource(0);
+    for (RMContainer container : getPreemptionContainers()) {
+      Resources.addTo(preemptedResources, container.getAllocatedResource());
+    }
+  }
+
+  public void clearPreemptedResources() {
+    preemptedResources.setMemory(0);
+    preemptedResources.setVirtualCores(0);
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu May 29 04:01:24 2014
@@ -20,14 +20,11 @@ package org.apache.hadoop.yarn.server.re
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -337,94 +334,78 @@ public class FairScheduler extends
     }
     if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
         Resources.none())) {
-      preemptResources(queueMgr.getLeafQueues(), resToPreempt);
+      preemptResources(resToPreempt);
     }
   }
 
   /**
-   * Preempt a quantity of resources from a list of QueueSchedulables. The
-   * policy for this is to pick apps from queues that are over their fair share,
-   * but make sure that no queue is placed below its fair share in the process.
-   * We further prioritize preemption by choosing containers with lowest
-   * priority to preempt.
+   * Preempt a quantity of resources. Each round, we start from the root queue,
+   * level-by-level, until choosing a candidate application.
+   * The policy for prioritizing preemption for each queue depends on its
+   * SchedulingPolicy: (1) fairshare/DRF, choose the ChildSchedulable that is
+   * most over its fair share; (2) FIFO, choose the childSchedulable that is
+   * latest launched.
+   * Inside each application, we further prioritize preemption by choosing
+   * containers with lowest priority to preempt.
+   * We make sure that no queue is placed below its fair share in the process.
    */
-  protected void preemptResources(Collection<FSLeafQueue> scheds,
-      Resource toPreempt) {
-    if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) {
+  protected void preemptResources(Resource toPreempt) {
+    if (Resources.equals(toPreempt, Resources.none())) {
       return;
     }
 
-    Map<RMContainer, FSSchedulerApp> apps = 
-        new HashMap<RMContainer, FSSchedulerApp>();
-    Map<RMContainer, FSLeafQueue> queues = 
-        new HashMap<RMContainer, FSLeafQueue>();
-
-    // Collect running containers from over-scheduled queues
-    List<RMContainer> runningContainers = new ArrayList<RMContainer>();
-    for (FSLeafQueue sched : scheds) {
-      if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
-          sched.getResourceUsage(), sched.getFairShare())) {
-        for (AppSchedulable as : sched.getRunnableAppSchedulables()) {
-          for (RMContainer c : as.getApp().getLiveContainers()) {
-            runningContainers.add(c);
-            apps.put(c, as.getApp());
-            queues.put(c, sched);
-          }
-        }
-      }
-    }
-
-    // Sort containers into reverse order of priority
-    Collections.sort(runningContainers, new Comparator<RMContainer>() {
-      public int compare(RMContainer c1, RMContainer c2) {
-        int ret = c1.getContainer().getPriority().compareTo(
-            c2.getContainer().getPriority());
-        if (ret == 0) {
-          return c2.getContainerId().compareTo(c1.getContainerId());
-        }
-        return ret;
-      }
-    });
-    
     // Scan down the list of containers we've already warned and kill them
     // if we need to.  Remove any containers from the list that we don't need
     // or that are no longer running.
     Iterator<RMContainer> warnedIter = warnedContainers.iterator();
-    Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>();
     while (warnedIter.hasNext()) {
       RMContainer container = warnedIter.next();
-      if (container.getState() == RMContainerState.RUNNING &&
+      if ((container.getState() == RMContainerState.RUNNING ||
+              container.getState() == RMContainerState.ALLOCATED) &&
           Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
               toPreempt, Resources.none())) {
-        warnOrKillContainer(container, apps.get(container), queues.get(container));
-        preemptedThisRound.add(container);
+        warnOrKillContainer(container);
         Resources.subtractFrom(toPreempt, container.getContainer().getResource());
       } else {
         warnedIter.remove();
       }
     }
 
-    // Scan down the rest of the containers until we've preempted enough, making
-    // sure we don't preempt too many from any queue
-    Iterator<RMContainer> runningIter = runningContainers.iterator();
-    while (runningIter.hasNext() &&
-        Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
-            toPreempt, Resources.none())) {
-      RMContainer container = runningIter.next();
-      FSLeafQueue sched = queues.get(container);
-      if (!preemptedThisRound.contains(container) &&
-          Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
-              sched.getResourceUsage(), sched.getFairShare())) {
-        warnOrKillContainer(container, apps.get(container), sched);
-        
-        warnedContainers.add(container);
-        Resources.subtractFrom(toPreempt, container.getContainer().getResource());
+    try {
+      // Reset preemptedResource for each app
+      for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
+        for (AppSchedulable app : queue.getRunnableAppSchedulables()) {
+          app.getApp().resetPreemptedResources();
+        }
+      }
+
+      while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
+          toPreempt, Resources.none())) {
+        RMContainer container =
+            getQueueManager().getRootQueue().preemptContainer();
+        if (container == null) {
+          break;
+        } else {
+          warnOrKillContainer(container);
+          warnedContainers.add(container);
+          Resources.subtractFrom(
+              toPreempt, container.getContainer().getResource());
+        }
+      }
+    } finally {
+      // Clear preemptedResources for each app
+      for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
+        for (AppSchedulable app : queue.getRunnableAppSchedulables()) {
+          app.getApp().clearPreemptedResources();
+        }
       }
     }
   }
   
-  private void warnOrKillContainer(RMContainer container, FSSchedulerApp app,
-      FSLeafQueue queue) {
+  private void warnOrKillContainer(RMContainer container) {
+    ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
+    FSSchedulerApp app = getSchedulerApp(appAttemptId);
+    FSLeafQueue queue = app.getQueue();
     LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
         "res=" + container.getContainer().getResource() +
         ") from queue " + queue.getName());

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Thu May 29 04:01:24 2014
@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
@@ -100,6 +101,11 @@ public abstract class Schedulable {
    */
   public abstract Resource assignContainer(FSSchedulerNode node);
 
+  /**
+   * Preempt a container from this Schedulable if possible.
+   */
+  public abstract RMContainer preemptContainer();
+
   /** Assign a fair share to this Schedulable. */
   public void setFairShare(Resource fairShare) {
     this.fairShare = fairShare;

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java Thu May 29 04:01:24 2014
@@ -139,4 +139,14 @@ public abstract class SchedulingPolicy {
    */
   public abstract void computeShares(
       Collection<? extends Schedulable> schedulables, Resource totalResources);
+
+  /**
+   * Check if the resource usage is over the fair share under this policy
+   *
+   * @param usage {@link Resource} the resource usage
+   * @param fairShare {@link Resource} the fair share
+   * @return true if check passes (is over) or false otherwise
+   */
+  public abstract boolean checkIfUsageOverFairShare(
+      Resource usage, Resource fairShare);
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java Thu May 29 04:01:24 2014
@@ -70,6 +70,11 @@ public class DominantResourceFairnessPol
   }
   
   @Override
+  public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
+    return !Resources.fitsIn(usage, fairShare);
+  }
+
+  @Override
   public void initialize(Resource clusterCapacity) {
     comparator.setClusterCapacity(clusterCapacity);
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Thu May 29 04:01:24 2014
@@ -120,6 +120,11 @@ public class FairSharePolicy extends Sch
   }
 
   @Override
+  public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
+    return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare);
+  }
+
+  @Override
   public byte getApplicableDepth() {
     return SchedulingPolicy.DEPTH_ANY;
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java Thu May 29 04:01:24 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.re
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Comparator;
-import java.util.Iterator;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -89,6 +88,13 @@ public class FifoPolicy extends Scheduli
   }
 
   @Override
+  public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
+    throw new UnsupportedOperationException(
+        "FifoPolicy doesn't support checkIfUsageOverFairshare operation, " +
+            "as FifoPolicy only works for FSLeafQueue.");
+  }
+
+  @Override
   public byte getApplicableDepth() {
     return SchedulingPolicy.DEPTH_LEAF;
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java Thu May 29 04:01:24 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.re
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -84,6 +85,11 @@ public class FakeSchedulable extends Sch
   }
 
   @Override
+  public RMContainer preemptContainer() {
+    return null;
+  }
+
+  @Override
   public Resource getDemand() {
     return null;
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Thu May 29 04:01:24 2014
@@ -1029,13 +1029,13 @@ public class TestFairScheduler extends F
 
   @Test (timeout = 5000)
   /**
-   * Make sure containers are chosen to be preempted in the correct order. Right
-   * now this means decreasing order of priority.
+   * Make sure containers are chosen to be preempted in the correct order.
    */
   public void testChoiceOfPreemptedContainers() throws Exception {
     conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
-    conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); 
+    conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
+    conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
     
     MockClock clock = new MockClock();
     scheduler.setClock(clock);
@@ -1052,7 +1052,7 @@ public class TestFairScheduler extends F
     out.println("<queue name=\"queueC\">");
     out.println("<weight>.25</weight>");
     out.println("</queue>");
-    out.println("<queue name=\"queueD\">");
+    out.println("<queue name=\"default\">");
     out.println("<weight>.25</weight>");
     out.println("</queue>");
     out.println("</allocations>");
@@ -1060,133 +1060,132 @@ public class TestFairScheduler extends F
     
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
-    // Create four nodes
+    // Create two nodes
     RMNode node1 =
-        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
+        MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
             "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
 
     RMNode node2 =
-        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
+        MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2,
             "127.0.0.2");
     NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
     scheduler.handle(nodeEvent2);
 
-    RMNode node3 =
-        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
-            "127.0.0.3");
-    NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
-    scheduler.handle(nodeEvent3);
-
-
-    // Queue A and B each request three containers
+    // Queue A and B each request two applications
     ApplicationAttemptId app1 =
-        createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
+        createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 1);
+    createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
     ApplicationAttemptId app2 =
-        createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
-    ApplicationAttemptId app3 =
-        createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
+        createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 3);
+    createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app2);
 
+    ApplicationAttemptId app3 =
+        createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 1);
+    createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app3);
     ApplicationAttemptId app4 =
-        createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
-    ApplicationAttemptId app5 =
-        createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
-    ApplicationAttemptId app6 =
-        createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
+        createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 3);
+    createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app4);
 
     scheduler.update();
 
+    scheduler.getQueueManager().getLeafQueue("queueA", true)
+        .setPolicy(SchedulingPolicy.parse("fifo"));
+    scheduler.getQueueManager().getLeafQueue("queueB", true)
+        .setPolicy(SchedulingPolicy.parse("fair"));
+
     // Sufficient node check-ins to fully schedule containers
-    for (int i = 0; i < 2; i++) {
-      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+    NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+    NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+    for (int i = 0; i < 4; i++) {
       scheduler.handle(nodeUpdate1);
-
-      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
       scheduler.handle(nodeUpdate2);
-
-      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
-      scheduler.handle(nodeUpdate3);
     }
 
-    assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app6).getLiveContainers().size());
-
-    // Now new requests arrive from queues C and D
-    ApplicationAttemptId app7 =
-        createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
-    ApplicationAttemptId app8 =
-        createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
-    ApplicationAttemptId app9 =
-        createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
-
-    ApplicationAttemptId app10 =
-        createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1);
-    ApplicationAttemptId app11 =
-        createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2);
-    ApplicationAttemptId app12 =
-        createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3);
-
-    scheduler.update();
-
-    // We should be able to claw back one container from A and B each.
-    // Make sure it is lowest priority container.
-    scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
-        Resources.createResource(2 * 1024));
-    assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size());
-    
-    // First verify we are adding containers to preemption list for the application
-    assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app3).getLiveContainers(),
-                                     scheduler.getSchedulerApp(app3).getPreemptionContainers()));
-    assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app6).getLiveContainers(),
-                                     scheduler.getSchedulerApp(app6).getPreemptionContainers()));
+    assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+    assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+    assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
+
+    // Now new requests arrive from queueC and default
+    createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
+    createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
+    createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
+    createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
+    scheduler.update();
+
+    // We should be able to claw back one container from queueA and queueB each.
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
+    assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+
+    // First verify we are adding containers to preemption list for the app.
+    // For queueA (fifo), app2 is selected.
+    // For queueB (fair), app4 is selected.
+    assertTrue("App2 should have container to be preempted",
+        !Collections.disjoint(
+            scheduler.getSchedulerApp(app2).getLiveContainers(),
+            scheduler.getSchedulerApp(app2).getPreemptionContainers()));
+    assertTrue("App4 should have container to be preempted",
+        !Collections.disjoint(
+            scheduler.getSchedulerApp(app2).getLiveContainers(),
+            scheduler.getSchedulerApp(app2).getPreemptionContainers()));
 
     // Pretend 15 seconds have passed
     clock.tick(15);
 
     // Trigger a kill by insisting we want containers back
-    scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
-        Resources.createResource(2 * 1024));
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
 
     // At this point the containers should have been killed (since we are not simulating AM)
-    assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size());
-    assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+    assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+    assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
+    // Inside each app, containers are sorted according to their priorities.
+    // Containers with priority 4 are preempted for app2 and app4.
+    Set<RMContainer> set = new HashSet<RMContainer>();
+    for (RMContainer container :
+        scheduler.getSchedulerApp(app2).getLiveContainers()) {
+      if (container.getAllocatedPriority().getPriority() == 4) {
+        set.add(container);
+      }
+    }
+    for (RMContainer container :
+        scheduler.getSchedulerApp(app4).getLiveContainers()) {
+      if (container.getAllocatedPriority().getPriority() == 4) {
+        set.add(container);
+      }
+    }
+    assertTrue("Containers with priority=4 in app2 and app4 should be " +
+        "preempted.", set.isEmpty());
 
     // Trigger a kill by insisting we want containers back
-    scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
-        Resources.createResource(2 * 1024));
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
 
     // Pretend 15 seconds have passed
     clock.tick(15);
 
     // We should be able to claw back another container from A and B each.
-    // Make sure it is lowest priority container.
-    scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
-        Resources.createResource(2 * 1024));
-    
-    assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    // For queueA (fifo), continue preempting from app2.
+    // For queueB (fair), even app4 has a lowest priority container with p=4, it
+    // still preempts from app3 as app3 is most over fair share.
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
+
+    assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
     assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size());
-    assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+    assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size());
     assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
-    assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size());
-    assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size());
 
     // Now A and B are below fair share, so preemption shouldn't do anything
-    scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
-        Resources.createResource(2 * 1024));
-    assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
-    assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size());
-    assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
-    assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size());
-    assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size());
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
+    assertTrue("App1 should have no container to be preempted",
+        scheduler.getSchedulerApp(app1).getPreemptionContainers().isEmpty());
+    assertTrue("App2 should have no container to be preempted",
+        scheduler.getSchedulerApp(app2).getPreemptionContainers().isEmpty());
+    assertTrue("App3 should have no container to be preempted",
+        scheduler.getSchedulerApp(app3).getPreemptionContainers().isEmpty());
+    assertTrue("App4 should have no container to be preempted",
+        scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty());
   }
 
   @Test (timeout = 5000)

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java Thu May 29 04:01:24 2014
@@ -35,10 +35,8 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.util.Collection;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
@@ -51,8 +49,7 @@ public class TestFairSchedulerPreemption
     public int lastPreemptMemory = -1;
 
     @Override
-    protected void preemptResources(
-        Collection<FSLeafQueue> scheds, Resource toPreempt) {
+    protected void preemptResources(Resource toPreempt) {
       lastPreemptMemory = toPreempt.getMemory();
     }
 



Mime
View raw message