hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1156037 [1/2] - in /hadoop/common/branches/MR-279/mapreduce: ./ yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache...
Date Wed, 10 Aug 2011 06:03:12 GMT
Author: acmurthy
Date: Wed Aug 10 06:03:11 2011
New Revision: 1156037

URL: http://svn.apache.org/viewvc?rev=1156037&view=rev
Log:
MAPREDUCE-2782. Unit tests for CapacityScheduler.

Added:
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
Modified:
    hadoop/common/branches/MR-279/mapreduce/CHANGES.txt
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeType.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java

Modified: hadoop/common/branches/MR-279/mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/CHANGES.txt?rev=1156037&r1=1156036&r2=1156037&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/CHANGES.txt (original)
+++ hadoop/common/branches/MR-279/mapreduce/CHANGES.txt Wed Aug 10 06:03:11 2011
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   MAPREDUCE-279
 
+    MAPREDUCE-2782. Unit tests for CapacityScheduler. (acmurthy) 
+    
     Fix for running ant targets to use the right set of common/test jars 
     (gkesavan via mahadev)
     

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1156037&r1=1156036&r2=1156037&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Wed Aug 10 06:03:11 2011
@@ -212,7 +212,7 @@ public class ResourceTrackerService exte
       this.nmLivelinessMonitor.receivedPing(nodeId);
 
       // 2. Check if it's a valid (i.e. not excluded) node
-      if (!this.nodesListManager.isValidNode(rmNode.getNodeHostName())) {
+      if (!this.nodesListManager.isValidNode(rmNode.getHostName())) {
         LOG.info("Disallowed NodeManager nodeId: " + nodeId +  
             " hostname: " + rmNode.getNodeAddress());
         throw new IOException("Disallowed NodeManager nodeId: " + 

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java?rev=1156037&r1=1156036&r2=1156037&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java Wed Aug 10 06:03:11 2011
@@ -48,7 +48,7 @@ public interface RMNode {
    * the hostname of this node
    * @return hostname of this node
    */
-  public String getNodeHostName();
+  public String getHostName();
   
   /**
    * the command port for this node

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1156037&r1=1156036&r2=1156037&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Wed Aug 10 06:03:11 2011
@@ -168,7 +168,7 @@ public class RMNodeImpl implements RMNod
   }
 
   @Override
-  public String getNodeHostName() {
+  public String getHostName() {
     return hostName;
   }
 

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1156037&r1=1156036&r2=1156037&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Wed Aug 10 06:03:11 2011
@@ -73,10 +73,11 @@ public class AppSchedulingInfo {
   boolean pending = true; // for app metrics
 
   public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
-      String queueName, String user, ApplicationStore store) {
+      String user, Queue queue, ApplicationStore store) {
     this.applicationAttemptId = appAttemptId;
     this.applicationId = appAttemptId.getApplicationId();
-    this.queueName = queueName;
+    this.queue = queue;
+    this.queueName = queue.getQueueName();
     this.user = user;
     this.store = store;
   }
@@ -132,8 +133,10 @@ public class AppSchedulingInfo {
       ResourceRequest lastRequest = null;
 
       if (hostName.equals(RMNode.ANY)) {
-        LOG.debug("update:" + " application=" + applicationId + " request="
-            + request);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("update:" + " application=" + applicationId + " request="
+              + request);
+        }
         updatePendingResources = true;
       }
 
@@ -148,7 +151,6 @@ public class AppSchedulingInfo {
       }
 
       asks.put(hostName, request);
-
       if (updatePendingResources) {
         int lastRequestContainers = lastRequest != null ? lastRequest
             .getNumContainers() : 0;
@@ -200,7 +202,7 @@ public class AppSchedulingInfo {
    */
   synchronized public void allocate(NodeType type, SchedulerNode node,
       Priority priority, ResourceRequest request, Container container) {
-    if (type == NodeType.DATA_LOCAL) {
+    if (type == NodeType.NODE_LOCAL) {
       allocateNodeLocal(node, priority, request, container);
     } else if (type == NodeType.RACK_LOCAL) {
       allocateRackLocal(node, priority, request, container);
@@ -234,7 +236,7 @@ public class AppSchedulingInfo {
     // Update future requirements
     nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
     if (nodeLocalRequest.getNumContainers() == 0) {
-      this.requests.get(priority).remove(node.getNodeAddress());
+      this.requests.get(priority).remove(node.getHostName());
     }
 
     ResourceRequest rackLocalRequest = requests.get(priority).get(

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeType.java?rev=1156037&r1=1156036&r2=1156037&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeType.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeType.java Wed Aug 10 06:03:11 2011
@@ -22,7 +22,7 @@ package org.apache.hadoop.yarn.server.re
  * Resource classification.
  */
 public enum NodeType {
-  DATA_LOCAL,
+  NODE_LOCAL,
   RACK_LOCAL,
   OFF_SWITCH
 }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java?rev=1156037&r1=1156036&r2=1156037&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java Wed Aug 10 06:03:11 2011
@@ -8,6 +8,8 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -19,6 +21,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -54,16 +57,17 @@ public class SchedulerApp {
   
   Map<Priority, Integer> schedulingOpportunities = new HashMap<Priority, Integer>();
 
-  final Resource currentReservation = recordFactory
+  Resource currentReservation = recordFactory
       .newRecordInstance(Resource.class);
 
   private final RMContext rmContext;
-  public SchedulerApp(RMContext rmContext, 
-      AppSchedulingInfo application, Queue queue) {
+  public SchedulerApp(ApplicationAttemptId applicationAttemptId, 
+      String user, Queue queue, 
+      RMContext rmContext, ApplicationStore store) {
     this.rmContext = rmContext;
-    this.appSchedulingInfo = application;
+    this.appSchedulingInfo = 
+        new AppSchedulingInfo(applicationAttemptId, user, queue, store);
     this.queue = queue;
-    application.setQueue(queue);
   }
 
   public ApplicationId getApplicationId() {
@@ -186,9 +190,11 @@ public class SchedulerApp {
         new RMContainerEvent(container.getId(), RMContainerEventType.START));
 
     Resources.addTo(currentConsumption, container.getResource());
-    LOG.debug("allocate: applicationId=" + container.getId().getAppId()
-        + " container=" + container.getId() + " host="
-        + container.getNodeId().toString());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("allocate: applicationId=" + container.getId().getAppId()
+          + " container=" + container.getId() + " host="
+          + container.getNodeId().getHost() + " type=" + type);
+    }
 
     // Add it to allContainers list.
     newlyAllocatedContainers.add(rmContainer);
@@ -272,15 +278,28 @@ public class SchedulerApp {
         this.reservedContainers.get(priority);
     return (reservedContainers == null) ? 0 : reservedContainers.size();
   }
+  
+  /**
+   * Get total current reservations.
+   * Used only by unit tests
+   * @return total current reservations
+   */
+  @Stable
+  @Private
+  public synchronized Resource getCurrentReservation() {
+    return currentReservation;
+  }
 
   public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
       RMContainer rmContainer, Container container) {
     // Create RMContainer if necessary
     if (rmContainer == null) {
-        rmContainer = 
-            new RMContainerImpl(container, getApplicationAttemptId(), 
-                node.getNodeID(), rmContext.getDispatcher().getEventHandler(), 
-                rmContext.getContainerAllocationExpirer());
+      rmContainer = 
+          new RMContainerImpl(container, getApplicationAttemptId(), 
+              node.getNodeID(), rmContext.getDispatcher().getEventHandler(), 
+              rmContext.getContainerAllocationExpirer());
+        
+      Resources.addTo(currentReservation, container.getResource());
     }
     rmContainer.handle(new RMContainerReservedEvent(container.getId(), 
         container.getResource(), node.getNodeID(), priority));
@@ -293,13 +312,11 @@ public class SchedulerApp {
     }
     reservedContainers.put(node.getNodeID(), rmContainer);
     
-    Resources.add(currentReservation, container.getResource());
-    
     LOG.info("Application " + getApplicationId() 
         + " reserved container " + rmContainer
         + " on node " + node + ", currently has " + reservedContainers.size()
         + " at priority " + priority 
-        + "; currentReservation " + currentReservation);
+        + "; currentReservation " + currentReservation.getMemory());
     
     return rmContainer;
   }
@@ -313,7 +330,7 @@ public class SchedulerApp {
     }
     
     Resource resource = reservedContainer.getContainer().getResource();
-    Resources.subtract(currentReservation, resource);
+    Resources.subtractFrom(currentReservation, resource);
 
     LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
         + node + ", currently has " + reservedContainers.size() + " at priority "
@@ -341,7 +358,10 @@ public class SchedulerApp {
     // Estimate: Required unique resources (i.e. hosts + racks)
     int requiredResources = 
         Math.max(this.getResourceRequests(priority).size() - 1, 0);
-    return ((float) requiredResources / clusterNodes);
+    
+    // waitFactor can't be more than '1' 
+    // i.e. no point skipping more than clustersize opportunities
+    return Math.min(((float)requiredResources / clusterNodes), 1.0f);
   }
 
   public synchronized List<RMContainer> getAllReservedContainers() {

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java?rev=1156037&r1=1156036&r2=1156037&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java Wed Aug 10 06:03:11 2011
@@ -1,9 +1,9 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,7 +36,7 @@ public class SchedulerNode {
   
   /* set of containers that are allocated containers */
   private final Map<ContainerId, RMContainer> launchedContainers = 
-    new TreeMap<ContainerId, RMContainer>();
+    new HashMap<ContainerId, RMContainer>();
   
   private final RMNode rmNode;
 
@@ -59,8 +59,8 @@ public class SchedulerNode {
     return this.rmNode.getHttpAddress();
   }
 
-  public String getNodeAddress() {
-    return this.rmNode.getNodeAddress();
+  public String getHostName() {
+    return this.rmNode.getHostName();
   }
 
   public String getRackName() {

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1156037&r1=1156036&r2=1156037&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Wed Aug 10 06:03:11 2011
@@ -83,7 +83,7 @@ implements ResourceScheduler, CapacitySc
   private final static List<Container> EMPTY_CONTAINER_LIST = 
     new ArrayList<Container>();
 
-  private final Comparator<Queue> queueComparator = new Comparator<Queue>() {
+  static final Comparator<Queue> queueComparator = new Comparator<Queue>() {
     @Override
     public int compare(Queue q1, Queue q2) {
       if (q1.getUtilization() < q2.getUtilization()) {
@@ -96,7 +96,7 @@ implements ResourceScheduler, CapacitySc
     }
   };
 
-  private final Comparator<SchedulerApp> applicationComparator = 
+  static final Comparator<SchedulerApp> applicationComparator = 
     new Comparator<SchedulerApp>() {
     @Override
     public int compare(SchedulerApp a1, SchedulerApp a2) {
@@ -199,9 +199,18 @@ implements ResourceScheduler, CapacitySc
   public static final String ROOT_QUEUE = 
     CapacitySchedulerConfiguration.PREFIX + ROOT;
 
+  static class QueueHook {
+    public Queue hook(Queue queue) {
+      return queue;
+    }
+  }
+  private static final QueueHook noop = new QueueHook();
+  
   @Lock(CapacityScheduler.class)
   private void initializeQueues(CapacitySchedulerConfiguration conf) {
-    root = parseQueue(conf, null, ROOT, queues, queues);
+    root = 
+        parseQueue(this, conf, null, ROOT, queues, queues, 
+            queueComparator, applicationComparator, noop);
     LOG.info("Initialized root queue " + root);
   }
 
@@ -210,7 +219,9 @@ implements ResourceScheduler, CapacitySc
   throws IOException {
     // Parse new queues
     Map<String, Queue> newQueues = new HashMap<String, Queue>();
-    Queue newRoot = parseQueue(conf, null, ROOT, newQueues, queues);
+    Queue newRoot = 
+        parseQueue(this, conf, null, ROOT, newQueues, queues, 
+            queueComparator, applicationComparator, noop);
     
     // Ensure all existing queues are still present
     validateExistingQueues(queues, newQueues);
@@ -258,9 +269,14 @@ implements ResourceScheduler, CapacitySc
   }
   
   @Lock(CapacityScheduler.class)
-  private Queue parseQueue(CapacitySchedulerConfiguration conf, 
+  static Queue parseQueue(
+      CapacitySchedulerContext csContext, 
+      CapacitySchedulerConfiguration conf, 
       Queue parent, String queueName, Map<String, Queue> queues,
-      Map<String, Queue> oldQueues) {
+      Map<String, Queue> oldQueues, 
+      Comparator<Queue> queueComparator,
+      Comparator<SchedulerApp> applicationComparator,
+      QueueHook hook) {
     Queue queue;
     String[] childQueueNames = 
       conf.getQueues((parent == null) ? 
@@ -270,21 +286,27 @@ implements ResourceScheduler, CapacitySc
         throw new IllegalStateException(
             "Queue configuration missing child queue names for " + queueName);
       }
-      queue = new LeafQueue(this, queueName, parent, applicationComparator,
+      queue = new LeafQueue(csContext, queueName, parent, applicationComparator,
                             oldQueues.get(queueName));
+      
+      // Used only for unit tests
+      queue = hook.hook(queue);
     } else {
       ParentQueue parentQueue = 
-        new ParentQueue(this, queueName, queueComparator, parent,
+        new ParentQueue(csContext, queueName, queueComparator, parent,
                         oldQueues.get(queueName));
+
+      // Used only for unit tests
+      queue = hook.hook(parentQueue);
+      
       List<Queue> childQueues = new ArrayList<Queue>();
       for (String childQueueName : childQueueNames) {
         Queue childQueue = 
-          parseQueue(conf, parentQueue, childQueueName, queues, oldQueues);
+          parseQueue(csContext, conf, queue, childQueueName, 
+              queues, oldQueues, queueComparator, applicationComparator, hook);
         childQueues.add(childQueue);
       }
       parentQueue.setChildQueues(childQueues);
-
-      queue = parentQueue;
     }
 
     queues.put(queueName, queue);
@@ -293,12 +315,16 @@ implements ResourceScheduler, CapacitySc
     return queue;
   }
 
+  synchronized Queue getQueue(String queueName) {
+    return queues.get(queueName);
+  }
+  
   private synchronized void
       addApplication(ApplicationAttemptId applicationAttemptId,
           String queueName, String user) {
 
     // Sanity checks
-    Queue queue = queues.get(queueName);
+    Queue queue = getQueue(queueName);
     if (queue == null) {
       String message = "Application " + applicationAttemptId + 
       " submitted by user " + user + " to unknown queue: " + queueName;
@@ -314,10 +340,9 @@ implements ResourceScheduler, CapacitySc
       return;
     }
 
-    AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
-        applicationAttemptId, queueName, user, null);
+    // TODO: Fix store
     SchedulerApp SchedulerApp = 
-        new SchedulerApp(this.rmContext, appSchedulingInfo, queue);
+        new SchedulerApp(applicationAttemptId, user, queue, rmContext, null);
 
     // Submit to the queue
     try {
@@ -480,7 +505,7 @@ implements ResourceScheduler, CapacitySc
   }
 
   private synchronized void nodeUpdate(RMNode nm, 
-      Map<String,List<Container>> containers ) {
+      Map<ApplicationId, List<Container>> containers ) {
     LOG.info("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
     
     SchedulerNode node = getNode(nm.getNodeID());
@@ -562,12 +587,8 @@ implements ResourceScheduler, CapacitySc
     case NODE_UPDATE:
     {
       NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
-      Map<ApplicationId, List<Container>> contAppMapping = nodeUpdatedEvent.getContainers();
-      Map<String, List<Container>> conts = new HashMap<String, List<Container>>();
-      for (Map.Entry<ApplicationId, List<Container>> entry : contAppMapping.entrySet()) {
-        conts.put(entry.getKey().toString(), entry.getValue());
-      }
-      nodeUpdate(nodeUpdatedEvent.getRMNode(), conts);
+      nodeUpdate(nodeUpdatedEvent.getRMNode(), 
+          nodeUpdatedEvent.getContainers());
     }
     break;
     case APP_ADDED:
@@ -662,12 +683,12 @@ implements ResourceScheduler, CapacitySc
   }
 
   @Lock(Lock.NoLock.class)
-  private SchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
+  SchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
     return applications.get(applicationAttemptId);
   }
 
   @Lock(Lock.NoLock.class)
-  private SchedulerNode getNode(NodeId nodeId) {
+  SchedulerNode getNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }
 

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1156037&r1=1156036&r2=1156037&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Wed Aug 10 06:03:11 2011
@@ -67,7 +67,7 @@ public class LeafQueue implements Queue 
   private static final Log LOG = LogFactory.getLog(LeafQueue.class);
 
   private final String queueName;
-  private final Queue parent;
+  private Queue parent;
   private float capacity;
   private float absoluteCapacity;
   private float maximumCapacity;
@@ -267,6 +267,38 @@ public class LeafQueue implements Queue 
     this.usedCapacity = usedCapacity;
   }
 
+  /**
+   * Set maximum capacity - used only for testing.
+   * @param maximumCapacity new max capacity
+   */
+  synchronized void setMaxCapacity(float maximumCapacity) {
+    this.maximumCapacity = maximumCapacity;
+    this.absoluteMaxCapacity = 
+      (maximumCapacity == CapacitySchedulerConfiguration.UNDEFINED) ? 
+          Float.MAX_VALUE : 
+          (parent.getAbsoluteCapacity() * maximumCapacity);
+  }
+  
+  /**
+   * Set user limit - used only for testing.
+   * @param userLimit new user limit
+   */
+  synchronized void setUserLimit(int userLimit) {
+    this.userLimit = userLimit;
+  }
+
+  /**
+   * Set user limit factor - used only for testing.
+   * @param userLimitFactor new user limit factor
+   */
+  synchronized void setUserLimitFactor(int userLimitFactor) {
+    this.userLimitFactor = userLimitFactor;
+  }
+
+  synchronized void setParentQueue(Queue parent) {
+    this.parent = parent;
+  }
+  
   public synchronized int getNumApplications() {
     return applications.size();
   }
@@ -473,7 +505,7 @@ public class LeafQueue implements Queue 
   assignContainers(Resource clusterResource, SchedulerNode node) {
 
     LOG.info("DEBUG --- assignContainers:" +
-        " node=" + node.getNodeAddress() + 
+        " node=" + node.getHostName() + 
         " #applications=" + applications.size());
     
     // Check for reserved resources
@@ -505,10 +537,8 @@ public class LeafQueue implements Queue 
           }
 
           // Are we going over limits by allocating to this application?
-          
           ResourceRequest required = 
             application.getResourceRequest(priority, RMNode.ANY);
-          
 
           // Maximum Capacity of the queue
           if (!assignToQueue(clusterResource, required.getCapability())) {
@@ -599,11 +629,17 @@ public class LeafQueue implements Queue 
     return true;
   }
 
-  private void setUserResourceLimit(SchedulerApp application, Resource resourceLimit) {
+  private void setUserResourceLimit(SchedulerApp application, 
+      Resource resourceLimit) {
     application.setAvailableResourceLimit(resourceLimit);
     metrics.setAvailableResourcesToUser(application.getUser(), resourceLimit);
   }
   
+  private int roundUp(int memory) {
+    return divideAndCeil(memory, minimumAllocation.getMemory()) * 
+        minimumAllocation.getMemory();
+  }
+  
   private Resource computeUserLimit(SchedulerApp application, 
       Resource clusterResource, Resource required) {
     // What is our current capacity? 
@@ -616,15 +652,13 @@ public class LeafQueue implements Queue 
     // Allow progress for queues with miniscule capacity
     final int queueCapacity = 
       Math.max(
-          divideAndCeil(
-              (int)(absoluteCapacity * clusterResource.getMemory()), 
-              minimumAllocation.getMemory()) 
-                * minimumAllocation.getMemory(),           // round up 
+          roundUp((int)(absoluteCapacity * clusterResource.getMemory())), 
           required.getMemory());
 
     final int consumed = usedResources.getMemory();
     final int currentCapacity = 
-      (consumed < queueCapacity) ? queueCapacity : (consumed + required.getMemory());
+      (consumed < queueCapacity) ? 
+          queueCapacity : (consumed + required.getMemory());
 
     // Never allow a single user to take more than the 
     // queue's configured capacity * user-limit-factor.
@@ -637,15 +671,19 @@ public class LeafQueue implements Queue 
     User user = getUser(userName);
 
     int limit = 
-      Math.min(
-          Math.max(divideAndCeil(currentCapacity, activeUsers), 
-              divideAndCeil((int)userLimit*currentCapacity, 100)),
+      roundUp(
+          Math.min(
+              Math.max(divideAndCeil(currentCapacity, activeUsers), 
+                       divideAndCeil((int)userLimit*currentCapacity, 100)),
               (int)(queueCapacity * userLimitFactor)
-      );
+              )
+          );
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("User limit computation for " + userName + 
-          " in queue " + getQueueName() + 
+          " in queue " + getQueueName() +
+          " userLimit=" + userLimit +
+          " userLimitFactor=" + userLimitFactor +
           " required: " + required + 
           " consumed: " + user.getConsumedResources() + 
           " limit: " + limit +
@@ -656,7 +694,7 @@ public class LeafQueue implements Queue 
           " clusterCapacity: " + clusterResource.getMemory()
       );
     }
-    
+
     return Resources.createResource(limit);
   }
   
@@ -665,7 +703,7 @@ public class LeafQueue implements Queue 
     User user = getUser(userName);
     
     // Note: We aren't considering the current request since there is a fixed
-    // overhead of the AM, so... 
+    // overhead of the AM, but it's a >= check, so... 
     if ((user.getConsumedResources().getMemory()) > limit.getMemory()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("User " + userName + " in queue " + getQueueName() + 
@@ -694,8 +732,9 @@ public class LeafQueue implements Queue 
     return ((requiredContainers - reservedContainers) > 0);
   }
 
-  Resource assignContainersOnNode(Resource clusterResource, SchedulerNode node, 
-      SchedulerApp application, Priority priority, RMContainer reservedContainer) {
+  private Resource assignContainersOnNode(Resource clusterResource, 
+      SchedulerNode node, SchedulerApp application, 
+      Priority priority, RMContainer reservedContainer) {
 
     Resource assigned = Resources.none();
 
@@ -720,23 +759,23 @@ public class LeafQueue implements Queue 
         priority, reservedContainer);
   }
 
-  Resource assignNodeLocalContainers(Resource clusterResource, SchedulerNode node, 
-      SchedulerApp application, Priority priority, 
-      RMContainer reservedContainer) {
-    ResourceRequest request = application.getResourceRequest(priority, node
-        .getNodeAddress());
+  private Resource assignNodeLocalContainers(Resource clusterResource, 
+      SchedulerNode node, SchedulerApp application, 
+      Priority priority, RMContainer reservedContainer) {
+    ResourceRequest request = 
+        application.getResourceRequest(priority, node.getHostName());
     if (request != null) {
-      if (canAssign(application, priority, node, NodeType.DATA_LOCAL, 
+      if (canAssign(application, priority, node, NodeType.NODE_LOCAL, 
           reservedContainer)) {
-        return assignContainer(clusterResource, node, application, priority, request, 
-            NodeType.DATA_LOCAL, reservedContainer);
+        return assignContainer(clusterResource, node, application, priority, 
+            request, NodeType.NODE_LOCAL, reservedContainer);
       }
     }
     
     return Resources.none();
   }
 
-  Resource assignRackLocalContainers(Resource clusterResource,  
+  private Resource assignRackLocalContainers(Resource clusterResource,  
       SchedulerNode node, SchedulerApp application, Priority priority,
       RMContainer reservedContainer) {
     ResourceRequest request = 
@@ -751,7 +790,7 @@ public class LeafQueue implements Queue 
     return Resources.none();
   }
 
-  Resource assignOffSwitchContainers(Resource clusterResource, SchedulerNode node, 
+  private Resource assignOffSwitchContainers(Resource clusterResource, SchedulerNode node, 
       SchedulerApp application, Priority priority, 
       RMContainer reservedContainer) {
     ResourceRequest request = 
@@ -770,42 +809,24 @@ public class LeafQueue implements Queue 
   boolean canAssign(SchedulerApp application, Priority priority, 
       SchedulerNode node, NodeType type, RMContainer reservedContainer) {
 
-    ResourceRequest offSwitchRequest = 
-      application.getResourceRequest(priority, RMNode.ANY);
-
-    if (offSwitchRequest.getNumContainers() == 0) {
-      return false;
+    // Reserved... 
+    if (reservedContainer != null) {
+      return true;
     }
-
+    
+    // Clearly we need containers for this application...
     if (type == NodeType.OFF_SWITCH) {
       // 'Delay' off-switch
-      long missedNodes = application.getSchedulingOpportunities(priority);
+      ResourceRequest offSwitchRequest = 
+          application.getResourceRequest(priority, RMNode.ANY);
+      long missedOpportunities = application.getSchedulingOpportunities(priority);
       long requiredContainers = offSwitchRequest.getNumContainers(); 
       
       float localityWaitFactor = 
         application.getLocalityWaitFactor(priority, 
             scheduler.getNumClusterNodes());
       
-      if (requiredContainers > 0) {
-        // No 'delay' for reserved containers
-        if (reservedContainer != null) {
-          return true;
-        }
-        
-        // Check if we have waited long enough
-        if (missedNodes < (requiredContainers * localityWaitFactor)) {
-          LOG.info("Application " + application.getApplicationId() + 
-              " has missed " + missedNodes + " opportunities," +
-              " waitFactor= " + localityWaitFactor + 
-              " for cluster of size " + scheduler.getNumClusterNodes());
-          return false;
-        }
-        return true;
-      }
-      return false;
-      
-//      return ((requiredContainers > 0) && 
-//        (requiredContainers * localityWaitFactor) < missedNodes));
+      return ((requiredContainers * localityWaitFactor) > missedOpportunities);
     }
 
     // Check if we need containers on this rack 
@@ -813,15 +834,14 @@ public class LeafQueue implements Queue 
       application.getResourceRequest(priority, node.getRackName());
     if (type == NodeType.RACK_LOCAL) {
       if (rackLocalRequest == null) {
-        // No point waiting for rack-locality if we don't need this rack
-        return offSwitchRequest.getNumContainers() > 0;
+        return false;
       } else {
         return rackLocalRequest.getNumContainers() > 0;      
       }
     }
 
     // Check if we need containers on this host
-    if (type == NodeType.DATA_LOCAL) {
+    if (type == NodeType.NODE_LOCAL) {
       // First: Do we need containers on this rack?
       if (rackLocalRequest != null && rackLocalRequest.getNumContainers() == 0) {
         return false;
@@ -829,7 +849,7 @@ public class LeafQueue implements Queue 
       
       // Now check if we need containers on this host...
       ResourceRequest nodeLocalRequest = 
-        application.getResourceRequest(priority, node.getNodeAddress());
+        application.getResourceRequest(priority, node.getHostName());
       if (nodeLocalRequest != null) {
         return nodeLocalRequest.getNumContainers() > 0;
       }
@@ -840,10 +860,12 @@ public class LeafQueue implements Queue 
   
   private Container getContainer(RMContainer rmContainer, 
       SchedulerApp application, SchedulerNode node, Resource capability) {
-    if (rmContainer != null) {
-      return rmContainer.getContainer();
-    }
-
+    return (rmContainer != null) ? rmContainer.getContainer() :
+      createContainer(application, node, capability);
+  }
+  
+  public Container createContainer(SchedulerApp application, SchedulerNode node, 
+      Resource capability) {
     Container container = 
           BuilderUtils.newContainer(this.recordFactory,
               application.getApplicationAttemptId(),
@@ -875,11 +897,13 @@ public class LeafQueue implements Queue 
   private Resource assignContainer(Resource clusterResource, SchedulerNode node, 
       SchedulerApp application, Priority priority, 
       ResourceRequest request, NodeType type, RMContainer rmContainer) {
-    LOG.info("DEBUG --- assignContainers:" +
-        " node=" + node.getNodeAddress() + 
-        " application=" + application.getApplicationId().getId() + 
-        " priority=" + priority.getPriority() + 
-        " request=" + request + " type=" + type);
+    if (LOG.isDebugEnabled()) {
+      LOG.info("DEBUG --- assignContainers:" +
+          " node=" + node.getHostName() + 
+          " application=" + application.getApplicationId().getId() + 
+          " priority=" + priority.getPriority() + 
+          " request=" + request + " type=" + type);
+    }
     Resource capability = request.getCapability();
 
     Resource available = node.getAvailableResource();
@@ -1005,7 +1029,7 @@ public class LeafQueue implements Queue 
     }
   }
 
-  private synchronized void allocateResource(Resource clusterResource, 
+  synchronized void allocateResource(Resource clusterResource, 
       String userName, Resource resource) {
     // Update queue metrics
     Resources.addTo(usedResources, resource);
@@ -1021,7 +1045,7 @@ public class LeafQueue implements Queue 
         " user=" + userName + " resources=" + user.getConsumedResources());
   }
 
-  private synchronized void releaseResource(Resource clusterResource, 
+  synchronized void releaseResource(Resource clusterResource, 
       String userName, Resource resource) {
     // Update queue metrics
     Resources.subtractFrom(usedResources, resource);

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1156037&r1=1156036&r2=1156037&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Wed Aug 10 06:03:11 2011
@@ -476,6 +476,20 @@ public class ParentQueue implements Queu
     this.utilization = utilization;
   }
 
+  /**
+   * Set maximum capacity - used only for testing.
+   * @param maximumCapacity new max capacity
+   */
+  synchronized void setMaxCapacity(float maximumCapacity) {
+    this.maximumCapacity = maximumCapacity;
+    float parentAbsoluteCapacity = 
+        (rootQueue) ? 100.0f : parent.getAbsoluteCapacity();
+    this.absoluteMaxCapacity = 
+      (maximumCapacity == CapacitySchedulerConfiguration.UNDEFINED) ? 
+          Float.MAX_VALUE : 
+          (parentAbsoluteCapacity * maximumCapacity);
+  }
+
   @Override
   public synchronized Resource assignContainers(
       Resource clusterResource, SchedulerNode node) {
@@ -487,15 +501,13 @@ public class ParentQueue implements Queu
       
       // Are we over maximum-capacity for this queue?
       if (!assignToQueue(clusterResource)) {
-        LOG.info(getQueueName() + 
-            " current-capacity (" + getUtilization() + ") > max-capacity (" + 
-            absoluteMaxCapacity + ")");
         break;
       }
       
       // Schedule
-      Resource assignedToChild = assignContainersToChildQueues(clusterResource, node);
-
+      Resource assignedToChild = 
+          assignContainersToChildQueues(clusterResource, node);
+      
       // Done if no child-queue assigned anything
       if (Resources.greaterThan(assignedToChild, Resources.none())) {
         // Track resource utilization for the parent-queue
@@ -527,15 +539,16 @@ public class ParentQueue implements Queu
     
     return assigned;
   }
-  
+
   private synchronized boolean assignToQueue(Resource clusterResource) {
     // Check how of the cluster's absolute capacity we are currently using...
     float currentCapacity = 
       (float)(usedResources.getMemory()) / clusterResource.getMemory();
-    if (currentCapacity > absoluteMaxCapacity) {
+    if (currentCapacity >= absoluteMaxCapacity) {
       LOG.info(getQueueName() + 
-          " current-capacity (" + currentCapacity + ") +" +
-          " > max-capacity (" + absoluteMaxCapacity + ")");
+          " used=" + usedResources.getMemory() + 
+          " current-capacity (" + currentCapacity + ") " +
+          " >= max-capacity (" + absoluteMaxCapacity + ")");
       return false;
     }
     return true;
@@ -561,6 +574,9 @@ public class ParentQueue implements Queu
       		" queue: " + childQueue.getQueuePath() + 
       		" stats: " + childQueue);
       assigned = childQueue.assignContainers(cluster, node);
+      LOG.info("DEBUG --- Assignedto" +
+          " queue: " + childQueue.getQueuePath() + 
+          " stats: " + childQueue + " --> " + assigned.getMemory());
 
       // If we do assign, remove the queue and re-insert in-order to re-sort
       if (Resources.greaterThan(assigned, Resources.none())) {
@@ -615,14 +631,14 @@ public class ParentQueue implements Queu
     }
   }
   
-  private synchronized void allocateResource(Resource clusterResource, 
+  synchronized void allocateResource(Resource clusterResource, 
       Resource resource) {
     Resources.addTo(usedResources, resource);
     updateResource(clusterResource);
     ++numContainers;
   }
   
-  private synchronized void releaseResource(Resource clusterResource, 
+  synchronized void releaseResource(Resource clusterResource, 
       Resource resource) {
     Resources.subtractFrom(usedResources, resource);
     updateResource(clusterResource);

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1156037&r1=1156036&r2=1156037&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java Wed Aug 10 06:03:11 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1156037&r1=1156036&r2=1156037&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Aug 10 06:03:11 2011
@@ -279,10 +279,10 @@ public class FifoScheduler implements Re
   
   private synchronized void addApplication(ApplicationAttemptId appAttemptId,
       String queueName, String user) {
-    AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
-        appAttemptId, queueName, user, null);
+    // TODO: Fix store
     SchedulerApp schedulerApp = 
-        new SchedulerApp(this.rmContext, appSchedulingInfo, DEFAULT_QUEUE);
+        new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, 
+            this.rmContext, null);
     applications.put(appAttemptId, schedulerApp);
     metrics.submitApp(user);
     LOG.info("Application Submission: " + appAttemptId.getApplicationId() + 
@@ -379,7 +379,7 @@ public class FifoScheduler implements Re
       maxContainers = Math.min(maxContainers, rackLocalRequest.getNumContainers());
     }
 
-    if (type == NodeType.DATA_LOCAL) {
+    if (type == NodeType.NODE_LOCAL) {
       ResourceRequest nodeLocalRequest = 
         application.getResourceRequest(priority, node.getRMNode().getNodeAddress());
       if (nodeLocalRequest != null) {
@@ -427,11 +427,11 @@ public class FifoScheduler implements Re
       int assignableContainers = 
         Math.min(
             getMaxAllocatableContainers(application, priority, node, 
-                NodeType.DATA_LOCAL), 
+                NodeType.NODE_LOCAL), 
                 request.getNumContainers());
       assignedContainers = 
         assignContainer(node, application, priority, 
-            assignableContainers, request, NodeType.DATA_LOCAL);
+            assignableContainers, request, NodeType.NODE_LOCAL);
     }
     return assignedContainers;
   }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1156037&r1=1156036&r2=1156037&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Wed Aug 10 06:03:11 2011
@@ -293,7 +293,7 @@ public class Application {
     int numContainers = containers.size();
     // Schedule in priority order
     for (Priority priority : requests.keySet()) {
-      assign(priority, NodeType.DATA_LOCAL, containers);
+      assign(priority, NodeType.NODE_LOCAL, containers);
       assign(priority, NodeType.RACK_LOCAL, containers);
       assign(priority, NodeType.OFF_SWITCH, containers);
 
@@ -351,7 +351,7 @@ public class Application {
 
   private void updateResourceRequests(Map<String, ResourceRequest> requests, 
       NodeType type, Task task) {
-    if (type == NodeType.DATA_LOCAL) {
+    if (type == NodeType.NODE_LOCAL) {
       for (String host : task.getHosts()) {
         LOG.info("DEBUG --- updateResourceRequests:" +
             " application=" + applicationId +
@@ -362,7 +362,7 @@ public class Application {
       }
     }
     
-    if (type == NodeType.DATA_LOCAL || type == NodeType.RACK_LOCAL) {
+    if (type == NodeType.NODE_LOCAL || type == NodeType.RACK_LOCAL) {
       for (String rack : task.getRacks()) {
         LOG.info("DEBUG --- updateResourceRequests:" +
             " application=" + applicationId +

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java?rev=1156037&r1=1156036&r2=1156037&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java Wed Aug 10 06:03:11 2011
@@ -141,7 +141,7 @@ public class MockNodes {
       }
 
       @Override
-      public String getNodeHostName() {
+      public String getHostName() {
         return hostName;
       }
 

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java?rev=1156037&r1=1156036&r2=1156037&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java Wed Aug 10 06:03:11 2011
@@ -97,7 +97,7 @@ public class Task {
   }
   
   public boolean canSchedule(NodeType type, String hostName) {
-    if (type == NodeType.DATA_LOCAL) { 
+    if (type == NodeType.NODE_LOCAL) { 
       return hosts.contains(hostName);
     } else if (type == NodeType.RACK_LOCAL) {
       return racks.contains(Application.resolve(hostName));

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1156037&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Wed Aug 10 06:03:11 2011
@@ -0,0 +1,743 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestLeafQueue {
+  private static final Log LOG = LogFactory.getLog(TestLeafQueue.class);
+  
+  private final RecordFactory recordFactory = 
+      RecordFactoryProvider.getRecordFactory(null);
+
+  RMContext rmContext;
+  CapacityScheduler cs;
+  CapacitySchedulerConfiguration csConf;
+  CapacitySchedulerContext csContext;
+  
+  Queue root;
+  Map<String, Queue> queues = new HashMap<String, Queue>();
+  
+  final static int GB = 1024;
+  final static String DEFAULT_RACK = "/default";
+
+  @Before
+  public void setUp() throws Exception {
+    cs = new CapacityScheduler();
+    rmContext = TestUtils.getMockRMContext();
+    
+    csConf = 
+        new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    
+    
+    csContext = mock(CapacitySchedulerContext.class);
+    when(csContext.getConfiguration()).thenReturn(csConf);
+    when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB));
+    when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB));
+    root = 
+        CapacityScheduler.parseQueue(csContext, csConf, null, "root", 
+            queues, queues, 
+            CapacityScheduler.queueComparator, 
+            CapacityScheduler.applicationComparator, 
+            TestUtils.spyHook);
+
+    cs.reinitialize(csConf, null, rmContext);
+  }
+  
+  private static final String A = "a";
+  private static final String B = "b";
+  private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
+    
+    // Define top-level queues
+    conf.setQueues(CapacityScheduler.ROOT, new String[] {A, B});
+    conf.setCapacity(CapacityScheduler.ROOT, 100);
+    
+    final String Q_A = CapacityScheduler.ROOT + "." + A;
+    conf.setCapacity(Q_A, 10);
+    
+    final String Q_B = CapacityScheduler.ROOT + "." + B;
+    conf.setCapacity(Q_B, 90);
+    
+    LOG.info("Setup top-level queues a and b");
+  }
+
+  private LeafQueue stubLeafQueue(LeafQueue queue) {
+    
+    // Mock some methods for ease in these unit tests
+    
+    // 1. LeafQueue.createContainer to return dummy containers
+    doAnswer(
+        new Answer<Container>() {
+          @Override
+          public Container answer(InvocationOnMock invocation) 
+              throws Throwable {
+            final SchedulerApp application = 
+                (SchedulerApp)(invocation.getArguments()[0]);
+            final ContainerId containerId =                 
+                TestUtils.getMockContainerId(application);
+
+            Container container = TestUtils.getMockContainer(
+                containerId,
+                ((SchedulerNode)(invocation.getArguments()[1])).getNodeID(), 
+                (Resource)(invocation.getArguments()[2]));
+            return container;
+          }
+        }
+      ).
+      when(queue).createContainer(
+              any(SchedulerApp.class), 
+              any(SchedulerNode.class), 
+              any(Resource.class));
+    
+    // 2. Stub out LeafQueue.parent.completedContainer
+    Queue parent = queue.getParent();
+    doNothing().when(parent).completedContainer(
+        any(Resource.class), any(SchedulerApp.class), any(SchedulerNode.class), 
+        any(RMContainer.class), any(RMContainerEventType.class));
+    
+    return queue;
+  }
+  
+  @Test
+  public void testSingleQueueWithOneUser() throws Exception {
+
+    // Manipulate queue 'a'
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+
+    // Users
+    final String user_0 = "user_0";
+
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 = 
+        TestUtils.getMockApplicationAttemptId(0, 0); 
+    SchedulerApp app_0 = 
+        new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
+    a.submitApplication(app_0, user_0, A);
+
+    final ApplicationAttemptId appAttemptId_1 = 
+        TestUtils.getMockApplicationAttemptId(1, 0); 
+    SchedulerApp app_1 = 
+        new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null);
+    a.submitApplication(app_1, user_0, A);  // same user
+
+    // Setup some nodes
+    String host_0 = "host_0";
+    SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
+    
+    final int numNodes = 1;
+    Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+    // Setup resource-requests
+    Priority priority = TestUtils.createMockPriority(1);
+    app_0.updateResourceRequests(Collections.singletonList(
+            TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 3, priority, 
+                recordFactory))); 
+
+    app_1.updateResourceRequests(Collections.singletonList(
+        TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
+            recordFactory))); 
+
+    // Start testing...
+    
+    // Only 1 container
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(1*GB, a.getUsedResources().getMemory());
+    assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+
+    // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
+    // you can get one container more than user-limit
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(2*GB, a.getUsedResources().getMemory());
+    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+    
+    // Can't allocate 3rd due to user-limit
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(2*GB, a.getUsedResources().getMemory());
+    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+    
+    // Bump up user-limit-factor, now allocate should work
+    a.setUserLimitFactor(10);
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(3*GB, a.getUsedResources().getMemory());
+    assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+
+    // One more should work, for app_1, due to user-limit-factor
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(4*GB, a.getUsedResources().getMemory());
+    assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+
+    // Test max-capacity
+    // Now - no more allocs since we are at max-cap
+    a.setMaxCapacity(0.5f);
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(4*GB, a.getUsedResources().getMemory());
+    assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+    
+    // Release each container from app_0
+    for (RMContainer rmContainer : app_0.getLiveContainers()) {
+      a.completedContainer(clusterResource, app_0, node_0, rmContainer, 
+          RMContainerEventType.KILL);
+    }
+    assertEquals(1*GB, a.getUsedResources().getMemory());
+    assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+    
+    // Release each container from app_1
+    for (RMContainer rmContainer : app_1.getLiveContainers()) {
+      a.completedContainer(clusterResource, app_1, node_0, rmContainer, 
+          RMContainerEventType.KILL);
+    }
+    assertEquals(0*GB, a.getUsedResources().getMemory());
+    assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+  }
+  
+  @Test
+  public void testSingleQueueWithMultipleUsers() throws Exception {
+    
+    // Mock the queue
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+    
+    // Users
+    final String user_0 = "user_0";
+    final String user_1 = "user_1";
+    final String user_2 = "user_2";
+
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 = 
+        TestUtils.getMockApplicationAttemptId(0, 0); 
+    SchedulerApp app_0 = 
+        new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
+    a.submitApplication(app_0, user_0, A);
+
+    final ApplicationAttemptId appAttemptId_1 = 
+        TestUtils.getMockApplicationAttemptId(1, 0); 
+    SchedulerApp app_1 = 
+        new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null);
+    a.submitApplication(app_1, user_0, A);  // same user
+
+    // Setup some nodes
+    String host_0 = "host_0";
+    SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
+    
+    final int numNodes = 1;
+    Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+    
+    // Setup resource-requests
+    Priority priority = TestUtils.createMockPriority(1);
+    app_0.updateResourceRequests(Collections.singletonList(
+            TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 10, priority,
+                recordFactory))); 
+
+    app_1.updateResourceRequests(Collections.singletonList(
+        TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 10, priority,
+            recordFactory))); 
+
+    /** 
+     * Start testing... 
+     */
+    
+    // Only 1 container
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(1*GB, a.getUsedResources().getMemory());
+    assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+
+    // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
+    // you can get one container more than user-limit
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(2*GB, a.getUsedResources().getMemory());
+    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+    
+    // Can't allocate 3rd due to user-limit
+    a.setUserLimit(25);
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(2*GB, a.getUsedResources().getMemory());
+    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+
+    // Submit more apps
+    final ApplicationAttemptId appAttemptId_2 = 
+        TestUtils.getMockApplicationAttemptId(2, 0); 
+    SchedulerApp app_2 = 
+        new SchedulerApp(appAttemptId_2, user_1, a, rmContext, null);
+    a.submitApplication(app_2, user_1, A);
+
+    final ApplicationAttemptId appAttemptId_3 = 
+        TestUtils.getMockApplicationAttemptId(3, 0); 
+    SchedulerApp app_3 = 
+        new SchedulerApp(appAttemptId_3, user_2, a, rmContext, null);
+    a.submitApplication(app_3, user_2, A);
+    
+    app_2.updateResourceRequests(Collections.singletonList(
+        TestUtils.createResourceRequest(RMNodeImpl.ANY, 3*GB, 1, priority,
+            recordFactory))); 
+
+    app_3.updateResourceRequests(Collections.singletonList(
+        TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
+            recordFactory))); 
+
+    // Now allocations should goto app_2 since 
+    // user_0 is at limit inspite of high user-limit-factor
+    a.setUserLimitFactor(10);
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(5*GB, a.getUsedResources().getMemory());
+    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(3*GB, app_2.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_3.getCurrentConsumption().getMemory());
+
+    // Now allocations should goto app_0 since 
+    // user_0 is at user-limit not above it
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(6*GB, a.getUsedResources().getMemory());
+    assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(3*GB, app_2.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_3.getCurrentConsumption().getMemory());
+    
+    // Test max-capacity
+    // Now - no more allocs since we are at max-cap
+    a.setMaxCapacity(0.5f);
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(6*GB, a.getUsedResources().getMemory());
+    assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(3*GB, app_2.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_3.getCurrentConsumption().getMemory());
+    
+    // Revert max-capacity and user-limit-factor
+    // Now, allocations should goto app_3 since it's under user-limit 
+    a.setMaxCapacity(-1);
+    a.setUserLimitFactor(1);
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(7*GB, a.getUsedResources().getMemory()); 
+    assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(3*GB, app_2.getCurrentConsumption().getMemory());
+    assertEquals(1*GB, app_3.getCurrentConsumption().getMemory());
+
+    // Now we should assign to app_3 again since user_2 is under user-limit
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(8*GB, a.getUsedResources().getMemory()); 
+    assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(3*GB, app_2.getCurrentConsumption().getMemory());
+    assertEquals(2*GB, app_3.getCurrentConsumption().getMemory());
+
+    // 8. Release each container from app_0
+    for (RMContainer rmContainer : app_0.getLiveContainers()) {
+      a.completedContainer(clusterResource, app_0, node_0, rmContainer, 
+          RMContainerEventType.KILL);
+    }
+    assertEquals(5*GB, a.getUsedResources().getMemory());
+    assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(3*GB, app_2.getCurrentConsumption().getMemory());
+    assertEquals(2*GB, app_3.getCurrentConsumption().getMemory());
+    
+    // 9. Release each container from app_2
+    for (RMContainer rmContainer : app_2.getLiveContainers()) {
+      a.completedContainer(clusterResource, app_2, node_0, rmContainer, 
+          RMContainerEventType.KILL);
+    }
+    assertEquals(2*GB, a.getUsedResources().getMemory());
+    assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_2.getCurrentConsumption().getMemory());
+    assertEquals(2*GB, app_3.getCurrentConsumption().getMemory());
+
+    // 10. Release each container from app_3
+    for (RMContainer rmContainer : app_3.getLiveContainers()) {
+      a.completedContainer(clusterResource, app_3, node_0, rmContainer, 
+          RMContainerEventType.KILL);
+    }
+    assertEquals(0*GB, a.getUsedResources().getMemory());
+    assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_2.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_3.getCurrentConsumption().getMemory());
+  }
+  
+  @Test
+  public void testReservation() throws Exception {
+
+    // Manipulate queue 'a'
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+
+    // Users
+    final String user_0 = "user_0";
+    final String user_1 = "user_1";
+
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 = 
+        TestUtils.getMockApplicationAttemptId(0, 0); 
+    SchedulerApp app_0 = 
+        new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
+    a.submitApplication(app_0, user_0, A);
+
+    final ApplicationAttemptId appAttemptId_1 = 
+        TestUtils.getMockApplicationAttemptId(1, 0); 
+    SchedulerApp app_1 = 
+        new SchedulerApp(appAttemptId_1, user_1, a, rmContext, null);
+    a.submitApplication(app_1, user_1, A);  
+
+    // Setup some nodes
+    String host_0 = "host_0";
+    SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
+    
+    final int numNodes = 1;
+    Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+    
+    // Setup resource-requests
+    Priority priority = TestUtils.createMockPriority(1);
+    app_0.updateResourceRequests(Collections.singletonList(
+            TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
+                recordFactory))); 
+
+    app_1.updateResourceRequests(Collections.singletonList(
+        TestUtils.createResourceRequest(RMNodeImpl.ANY, 4*GB, 1, priority,
+            recordFactory))); 
+
+    // Start testing...
+    
+    // Only 1 container
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(1*GB, a.getUsedResources().getMemory());
+    assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+
+    // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
+    // you can get one container more than user-limit
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(2*GB, a.getUsedResources().getMemory());
+    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+    
+    // Now, reservation should kick in for app_1
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(6*GB, a.getUsedResources().getMemory()); 
+    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
+    assertEquals(2*GB, node_0.getUsedResource().getMemory());
+    
+    // Now free 1 container from app_0 i.e. 1G
+    a.completedContainer(clusterResource, app_0, node_0, 
+        app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL);
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(5*GB, a.getUsedResources().getMemory()); 
+    assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
+    assertEquals(1*GB, node_0.getUsedResource().getMemory());
+
+    // Now finish another container from app_0 and fulfill the reservation
+    a.completedContainer(clusterResource, app_0, node_0, 
+        app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL);
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(4*GB, a.getUsedResources().getMemory());
+    assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentReservation().getMemory());
+    assertEquals(4*GB, node_0.getUsedResource().getMemory());
+  }
+  
+  
+  @Test
+  public void testLocalityScheduling() throws Exception {
+
+    // Manipulate queue 'a'
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+
+    // User
+    String user_0 = "user_0";
+    
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 = 
+        TestUtils.getMockApplicationAttemptId(0, 0); 
+    SchedulerApp app_0 = 
+        spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null));
+    a.submitApplication(app_0, user_0, A);
+    
+    // Setup some nodes and racks
+    String host_0 = "host_0";
+    String rack_0 = "rack_0";
+    SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
+    
+    String host_1 = "host_1";
+    String rack_1 = "rack_1";
+    SchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
+    
+    String host_2 = "host_2";
+    String rack_2 = "rack_2";
+    SchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
+
+    final int numNodes = 3;
+    Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+    
+    // Setup resource-requests and submit
+    Priority priority = TestUtils.createMockPriority(1);
+    List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(host_0, 1*GB, 1, 
+            priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(rack_0, 1*GB, 1, 
+            priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(host_1, 1*GB, 1, 
+            priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
+            priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 3, // one extra 
+            priority, recordFactory));
+    app_0.updateResourceRequests(app_0_requests_0);
+
+    // Start testing...
+    
+    // Start with off switch, shouldn't allocate due to delay scheduling
+    a.assignContainers(clusterResource, node_2);
+    verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
+        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    assertEquals(1, app_0.getSchedulingOpportunities(priority));
+    assertEquals(3, app_0.getTotalRequiredResources(priority));
+
+    // Another off switch, shouldn't allocate due to delay scheduling
+    a.assignContainers(clusterResource, node_2);
+    verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
+        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    assertEquals(2, app_0.getSchedulingOpportunities(priority));
+    assertEquals(3, app_0.getTotalRequiredResources(priority));
+    
+    // Another off switch, shouldn't allocate due to delay scheduling
+    a.assignContainers(clusterResource, node_2);
+    verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
+        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    assertEquals(3, app_0.getSchedulingOpportunities(priority));
+    assertEquals(3, app_0.getTotalRequiredResources(priority));
+    
+    // Another off switch, now we should allocate 
+    // since missedOpportunities=3 and reqdContainers=3
+    a.assignContainers(clusterResource, node_2);
+    verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), 
+        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
+    assertEquals(2, app_0.getTotalRequiredResources(priority));
+    
+    // NODE_LOCAL - node_0
+    a.assignContainers(clusterResource, node_0);
+    verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), 
+        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
+    assertEquals(1, app_0.getTotalRequiredResources(priority));
+    
+    // NODE_LOCAL - node_1
+    a.assignContainers(clusterResource, node_1);
+    verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), 
+        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
+    assertEquals(0, app_0.getTotalRequiredResources(priority));
+    
+    // Add 1 more request to check for RACK_LOCAL
+    app_0_requests_0.clear();
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(host_1, 1*GB, 1, 
+            priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
+            priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, // one extra 
+            priority, recordFactory));
+    app_0.updateResourceRequests(app_0_requests_0);
+    assertEquals(1, app_0.getTotalRequiredResources(priority));
+    
+    String host_3 = "host_3"; // on rack_1
+    SchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
+    
+    a.assignContainers(clusterResource, node_3);
+    verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), 
+        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
+    assertEquals(0, app_0.getTotalRequiredResources(priority));
+  }
+  
+  @Test
+  public void testApplicationPriorityScheduling() throws Exception {
+    // Manipulate queue 'a'
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+
+    // User
+    String user_0 = "user_0";
+    
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 = 
+        TestUtils.getMockApplicationAttemptId(0, 0); 
+    SchedulerApp app_0 = 
+        spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null));
+    a.submitApplication(app_0, user_0, A);
+    
+    // Setup some nodes and racks
+    String host_0 = "host_0";
+    String rack_0 = "rack_0";
+    SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
+    
+    String host_1 = "host_1";
+    String rack_1 = "rack_1";
+    SchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
+    
+    String host_2 = "host_2";
+    String rack_2 = "rack_2";
+    SchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
+
+    final int numNodes = 3;
+    Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+    
+    // Setup resource-requests and submit
+    List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
+    
+    // P1
+    Priority priority_1 = TestUtils.createMockPriority(1);
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(host_0, 1*GB, 1, 
+            priority_1, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(rack_0, 1*GB, 1, 
+            priority_1, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(host_1, 1*GB, 1, 
+            priority_1, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
+            priority_1, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, 
+            priority_1, recordFactory));
+    
+    // P2
+    Priority priority_2 = TestUtils.createMockPriority(2);
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(host_2, 2*GB, 1, 
+            priority_2, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(rack_2, 2*GB, 1, 
+            priority_2, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, 
+            priority_2, recordFactory));
+    
+    app_0.updateResourceRequests(app_0_requests_0);
+
+    // Start testing...
+    
+    // Start with off switch, shouldn't allocate P1 due to delay scheduling
+    // thus, no P2 either!
+    a.assignContainers(clusterResource, node_2);
+    verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
+        eq(priority_1), any(ResourceRequest.class), any(Container.class));
+    assertEquals(1, app_0.getSchedulingOpportunities(priority_1));
+    assertEquals(2, app_0.getTotalRequiredResources(priority_1));
+    verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
+        eq(priority_2), any(ResourceRequest.class), any(Container.class));
+    assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
+    assertEquals(1, app_0.getTotalRequiredResources(priority_2));
+
+    // Another off-switch, shouldn't allocate P1 due to delay scheduling
+    // thus, no P2 either!
+    a.assignContainers(clusterResource, node_2);
+    verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
+        eq(priority_1), any(ResourceRequest.class), any(Container.class));
+    assertEquals(2, app_0.getSchedulingOpportunities(priority_1));
+    assertEquals(2, app_0.getTotalRequiredResources(priority_1));
+    verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
+        eq(priority_2), any(ResourceRequest.class), any(Container.class));
+    assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
+    assertEquals(1, app_0.getTotalRequiredResources(priority_2));
+
+    // Another off-switch, shouldn allocate OFF_SWITCH P1
+    a.assignContainers(clusterResource, node_2);
+    verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), 
+        eq(priority_1), any(ResourceRequest.class), any(Container.class));
+    assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
+    assertEquals(1, app_0.getTotalRequiredResources(priority_1));
+    verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
+        eq(priority_2), any(ResourceRequest.class), any(Container.class));
+    assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
+    assertEquals(1, app_0.getTotalRequiredResources(priority_2));
+
+    // Now, DATA_LOCAL for P1
+    a.assignContainers(clusterResource, node_0);
+    verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), 
+        eq(priority_1), any(ResourceRequest.class), any(Container.class));
+    assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
+    assertEquals(0, app_0.getTotalRequiredResources(priority_1));
+    verify(app_0, never()).allocate(any(NodeType.class), eq(node_0), 
+        eq(priority_2), any(ResourceRequest.class), any(Container.class));
+    assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
+    assertEquals(1, app_0.getTotalRequiredResources(priority_2));
+
+    // Now, OFF_SWITCH for P2
+    a.assignContainers(clusterResource, node_1);
+    verify(app_0, never()).allocate(any(NodeType.class), eq(node_1), 
+        eq(priority_1), any(ResourceRequest.class), any(Container.class));
+    assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
+    assertEquals(0, app_0.getTotalRequiredResources(priority_1));
+    verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_1), 
+        eq(priority_2), any(ResourceRequest.class), any(Container.class));
+    assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
+    assertEquals(0, app_0.getTotalRequiredResources(priority_2));
+
+  }
+  
+  @After
+  public void tearDown() throws Exception {
+  }
+}



Mime
View raw message