hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1165404 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/...
Date Mon, 05 Sep 2011 19:54:47 GMT
Author: acmurthy
Date: Mon Sep  5 19:54:46 2011
New Revision: 1165404

URL: http://svn.apache.org/viewvc?rev=1165404&view=rev
Log:
Merge -r 1165402:1165403 from trunk to branch-0.23 to fix MAPREDUCE-2697.

Added:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
      - copied unchanged from r1165403, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1165404&r1=1165403&r2=1165404&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Mon Sep  5 19:54:46
2011
@@ -231,6 +231,11 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-2735. Add an applications summary log to ResourceManager.
     (Thomas Graves via acmurthy) 
 
+    MAPREDUCE-2697. Enhance CapacityScheduler to cap concurrently running
+    applications per-queue & per-user. (acmurthy) 
+    Configuration changes:
+      add yarn.capacity-scheduler.maximum-am-resource-percent
+
   OPTIMIZATIONS
 
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java?rev=1165404&r1=1165403&r2=1165404&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java
Mon Sep  5 19:54:46 2011
@@ -29,8 +29,6 @@ import org.apache.hadoop.yarn.proto.Yarn
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProtoOrBuilder;
-import org.mortbay.log.Log;
-
 
     
 public class ContainerIdPBImpl extends ProtoBase<ContainerIdProto> implements ContainerId
{

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1165404&r1=1165403&r2=1165404&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
Mon Sep  5 19:54:46 2011
@@ -167,6 +167,11 @@ implements ResourceScheduler, CapacitySc
   }
 
   @Override
+  public Resource getClusterResources() {
+    return clusterResource;
+  }
+  
+  @Override
   public synchronized void reinitialize(Configuration conf,
       ContainerTokenSecretManager containerTokenSecretManager, RMContext rmContext) 
   throws IOException {
@@ -621,6 +626,7 @@ implements ResourceScheduler, CapacitySc
   private synchronized void addNode(RMNode nodeManager) {
     this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager));
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
+    root.updateClusterResource(clusterResource);
     ++numNodeManagers;
     LOG.info("Added node " + nodeManager.getNodeAddress() + 
         " clusterResource: " + clusterResource);
@@ -629,6 +635,7 @@ implements ResourceScheduler, CapacitySc
   private synchronized void removeNode(RMNode nodeInfo) {
     SchedulerNode node = this.nodes.get(nodeInfo.getNodeID());
     Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
+    root.updateClusterResource(clusterResource);
     --numNodeManagers;
 
     // Remove running containers

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1165404&r1=1165403&r2=1165404&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
Mon Sep  5 19:54:46 2011
@@ -50,6 +50,10 @@ public class CapacitySchedulerConfigurat
     PREFIX + "maximum-applications";
   
   @Private
+  public static final String MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT =
+    PREFIX + "maximum-am-resource-percent";
+  
+  @Private
   public static final String QUEUES = "queues";
   
   @Private
@@ -83,6 +87,10 @@ public class CapacitySchedulerConfigurat
   public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
   
   @Private
+  public static final float 
+  DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT = 0.1f;
+  
+  @Private
   public static final int UNDEFINED = -1;
   
   @Private
@@ -124,6 +132,11 @@ public class CapacitySchedulerConfigurat
     return maxApplications;
   }
   
+  public float getMaximumApplicationMasterResourcePercent() {
+    return getFloat(MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 
+        DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT);
+  }
+  
   public int getCapacity(String queue) {
     int capacity = getInt(getQueuePrefix(queue) + CAPACITY, UNDEFINED);
     if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) {

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java?rev=1165404&r1=1165403&r2=1165404&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
Mon Sep  5 19:54:46 2011
@@ -37,4 +37,6 @@ public interface CapacitySchedulerContex
   int getNumClusterNodes();
 
   RMContext getRMContext();
+  
+  Resource getClusterResources();
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1165404&r1=1165403&r2=1165404&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
Mon Sep  5 19:54:46 2011
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -77,15 +78,22 @@ public class LeafQueue implements Queue 
 
   private int maxApplications;
   private int maxApplicationsPerUser;
+  
+  private float maxAMResourcePercent;
+  private int maxActiveApplications;
+  private int maxActiveApplicationsPerUser;
+  
   private Resource usedResources = Resources.createResource(0);
   private float utilization = 0.0f;
   private float usedCapacity = 0.0f;
   private volatile int numContainers;
 
-  Set<SchedulerApp> applications;
+  Set<SchedulerApp> activeApplications;
   Map<ApplicationAttemptId, SchedulerApp> applicationsMap = 
       new HashMap<ApplicationAttemptId, SchedulerApp>();
   
+  Set<SchedulerApp> pendingApplications;
+  
   private final Resource minimumAllocation;
   private final Resource maximumAllocation;
   private final float minimumAllocationFactor;
@@ -108,6 +116,8 @@ public class LeafQueue implements Queue 
 
   private CapacitySchedulerContext scheduler;
   
+  final static int DEFAULT_AM_RESOURCE = 2 * 1024;
+  
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, Queue parent, 
       Comparator<SchedulerApp> applicationComparator, Queue old) {
@@ -144,6 +154,15 @@ public class LeafQueue implements Queue 
     int maxApplicationsPerUser = 
       (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor);
 
+    this.maxAMResourcePercent = 
+        cs.getConfiguration().getMaximumApplicationMasterResourcePercent();
+    int maxActiveApplications = 
+        computeMaxActiveApplications(cs.getClusterResources(), 
+            maxAMResourcePercent, absoluteCapacity);
+    int maxActiveApplicationsPerUser = 
+        computeMaxActiveApplicationsPerUser(maxActiveApplications, userLimit, 
+            userLimitFactor);
+
     this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
     this.queueInfo.setQueueName(queueName);
     this.queueInfo.setChildQueues(new ArrayList<QueueInfo>());
@@ -157,20 +176,38 @@ public class LeafQueue implements Queue 
         maximumCapacity, absoluteMaxCapacity, 
         userLimit, userLimitFactor, 
         maxApplications, maxApplicationsPerUser,
+        maxActiveApplications, maxActiveApplicationsPerUser,
         state, acls);
 
     LOG.info("DEBUG --- LeafQueue:" +
         " name=" + queueName + 
         ", fullname=" + getQueuePath());
 
-    this.applications = new TreeSet<SchedulerApp>(applicationComparator);
+    this.pendingApplications = 
+        new TreeSet<SchedulerApp>(applicationComparator);
+    this.activeApplications = new TreeSet<SchedulerApp>(applicationComparator);
   }
 
+  private int computeMaxActiveApplications(Resource clusterResource,
+      float maxAMResourcePercent, float absoluteCapacity) {
+    return 
+        Math.max(
+            (int)((clusterResource.getMemory() / DEFAULT_AM_RESOURCE) * 
+                   maxAMResourcePercent * absoluteCapacity), 
+            1);
+  }
+  
+  private int computeMaxActiveApplicationsPerUser(int maxActiveApplications, 
+      int userLimit, float userLimitFactor) {
+    return (int)(maxActiveApplications * (userLimit / 100.0f) * userLimitFactor);
+  }
+  
   private synchronized void setupQueueConfigs(
       float capacity, float absoluteCapacity, 
       float maxCapacity, float absoluteMaxCapacity,
       int userLimit, float userLimitFactor,
       int maxApplications, int maxApplicationsPerUser,
+      int maxActiveApplications, int maxActiveApplicationsPerUser,
       QueueState state, Map<QueueACL, AccessControlList> acls)
   {
     this.capacity = capacity; 
@@ -185,6 +222,9 @@ public class LeafQueue implements Queue 
     this.maxApplications = maxApplications;
     this.maxApplicationsPerUser = maxApplicationsPerUser;
 
+    this.maxActiveApplications = maxActiveApplications;
+    this.maxActiveApplicationsPerUser = maxActiveApplicationsPerUser;
+    
     this.state = state;
 
     this.acls = acls;
@@ -269,6 +309,22 @@ public class LeafQueue implements Queue 
     return minimumAllocationFactor;
   }
 
+  public int getMaxApplications() {
+    return maxApplications;
+  }
+
+  public int getMaxApplicationsPerUser() {
+    return maxApplicationsPerUser;
+  }
+
+  public int getMaximumActiveApplications() {
+    return maxActiveApplications;
+  }
+
+  public int getMaximumActiveApplicationsPerUser() {
+    return maxActiveApplicationsPerUser;
+  }
+
   @Override
   public synchronized float getUsedCapacity() {
     return usedCapacity;
@@ -329,10 +385,34 @@ public class LeafQueue implements Queue 
     this.parent = parent;
   }
   
+  @Override
   public synchronized int getNumApplications() {
-    return applications.size();
+    return getNumPendingApplications() + getNumActiveApplications();
+  }
+
+  public synchronized int getNumPendingApplications() {
+    return pendingApplications.size();
+  }
+
+  public synchronized int getNumActiveApplications() {
+    return activeApplications.size();
+  }
+
+  @Private
+  public synchronized int getNumApplications(String user) {
+    return getUser(user).getTotalApplications();
+  }
+
+  @Private
+  public synchronized int getNumPendingApplications(String user) {
+    return getUser(user).getPendingApplications();
   }
 
+  @Private
+  public synchronized int getNumActiveApplications(String user) {
+    return getUser(user).getActiveApplications();
+  }
+  
   public synchronized int getNumContainers() {
     return numContainers;
   }
@@ -342,6 +422,16 @@ public class LeafQueue implements Queue 
     return state;
   }
 
+  @Private
+  public int getUserLimit() {
+    return userLimit;
+  }
+
+  @Private
+  public float getUserLimitFactor() {
+    return userLimitFactor;
+  }
+
   @Override
   public synchronized Map<QueueACL, AccessControlList> getQueueAcls() {
     return new HashMap<QueueACL, AccessControlList>(acls);
@@ -404,6 +494,8 @@ public class LeafQueue implements Queue 
         leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity, 
         leafQueue.userLimit, leafQueue.userLimitFactor, 
         leafQueue.maxApplications, leafQueue.maxApplicationsPerUser,
+        leafQueue.maxActiveApplications, 
+        leafQueue.maxActiveApplicationsPerUser,
         leafQueue.state, leafQueue.acls);
     
     updateResource(clusterResource);
@@ -443,7 +535,7 @@ public class LeafQueue implements Queue 
     synchronized (this) {
 
       // Check if the queue is accepting jobs
-      if (state != QueueState.RUNNING) {
+      if (getState() != QueueState.RUNNING) {
         String msg = "Queue " + getQueuePath() +
         " is STOPPED. Cannot accept submission of application: " +
         application.getApplicationId();
@@ -452,7 +544,7 @@ public class LeafQueue implements Queue 
       }
 
       // Check submission limits for queues
-      if (getNumApplications() >= maxApplications) {
+      if (getNumApplications() >= getMaxApplications()) {
         String msg = "Queue " + getQueuePath() + 
         " already has " + getNumApplications() + " applications," +
         " cannot accept submission of application: " + 
@@ -463,9 +555,9 @@ public class LeafQueue implements Queue 
 
       // Check submission limits for the user on this queue
       user = getUser(userName);
-      if (user.getApplications() >= maxApplicationsPerUser) {
+      if (user.getTotalApplications() >= getMaxApplicationsPerUser()) {
         String msg = "Queue " + getQueuePath() + 
-        " already has " + user.getApplications() + 
+        " already has " + user.getTotalApplications() + 
         " applications from user " + userName + 
         " cannot accept submission of application: " + 
         application.getApplicationId();
@@ -490,17 +582,46 @@ public class LeafQueue implements Queue 
     }
   }
 
+  private synchronized void activateApplications() {
+    for (Iterator<SchedulerApp> i=pendingApplications.iterator(); 
+         i.hasNext(); ) {
+      SchedulerApp application = i.next();
+      
+      // Check queue limit
+      if (getNumActiveApplications() >= getMaximumActiveApplications()) {
+        break;
+      }
+      
+      // Check user limit
+      User user = getUser(application.getUser());
+      if (user.getActiveApplications() < getMaximumActiveApplicationsPerUser()) {
+        user.activateApplication();
+        activeApplications.add(application);
+        i.remove();
+        LOG.info("Application " + application.getApplicationId().getId() + 
+            " from user: " + application.getUser() + 
+            " activated in queue: " + getQueueName());
+      }
+    }
+  }
+  
   private synchronized void addApplication(SchedulerApp application, User user) {
     // Accept 
     user.submitApplication();
-    applications.add(application);
+    pendingApplications.add(application);
     applicationsMap.put(application.getApplicationAttemptId(), application);
 
+    // Activate applications
+    activateApplications();
+    
     LOG.info("Application added -" +
         " appId: " + application.getApplicationId() +
         " user: " + user + "," + " leaf-queue: " + getQueueName() +
-        " #user-applications: " + user.getApplications() + 
-        " #queue-applications: " + getNumApplications());
+        " #user-pending-applications: " + user.getPendingApplications() +
+        " #user-active-applications: " + user.getActiveApplications() +
+        " #queue-pending-applications: " + getNumPendingApplications() +
+        " #queue-active-applications: " + getNumActiveApplications()
+        );
   }
 
   @Override
@@ -515,20 +636,26 @@ public class LeafQueue implements Queue 
   }
 
   public synchronized void removeApplication(SchedulerApp application, User user) {
-    applications.remove(application);
+    activeApplications.remove(application);
     applicationsMap.remove(application.getApplicationAttemptId());
 
     user.finishApplication();
-    if (user.getApplications() == 0) {
+    if (user.getTotalApplications() == 0) {
       users.remove(application.getUser());
     }
 
+    // Check if we can activate more applications
+    activateApplications();
+    
     LOG.info("Application removed -" +
         " appId: " + application.getApplicationId() + 
         " user: " + application.getUser() + 
         " queue: " + getQueueName() +
-        " #user-applications: " + user.getApplications() + 
-        " #queue-applications: " + getNumApplications());
+        " #user-pending-applications: " + user.getPendingApplications() +
+        " #user-active-applications: " + user.getActiveApplications() +
+        " #queue-pending-applications: " + getNumPendingApplications() +
+        " #queue-active-applications: " + getNumActiveApplications()
+        );
   }
   
   private synchronized SchedulerApp getApplication(
@@ -542,7 +669,7 @@ public class LeafQueue implements Queue 
 
     LOG.info("DEBUG --- assignContainers:" +
         " node=" + node.getHostName() + 
-        " #applications=" + applications.size());
+        " #applications=" + activeApplications.size());
     
     // Check for reserved resources
     RMContainer reservedContainer = node.getReservedContainer();
@@ -554,7 +681,7 @@ public class LeafQueue implements Queue 
     }
     
     // Try to assign containers to applications in order
-    for (SchedulerApp application : applications) {
+    for (SchedulerApp application : activeApplications) {
       
       LOG.info("DEBUG --- pre-assignContainers for application "
           + application.getApplicationId());
@@ -1119,7 +1246,16 @@ public class LeafQueue implements Queue 
   }
 
   @Override
-  public synchronized void updateResource(Resource clusterResource) {
+  public synchronized void updateClusterResource(Resource clusterResource) {
+    maxActiveApplications = 
+        computeMaxActiveApplications(clusterResource, maxAMResourcePercent, 
+            absoluteCapacity);
+    maxActiveApplicationsPerUser = 
+        computeMaxActiveApplicationsPerUser(maxActiveApplications, userLimit, 
+            userLimitFactor);
+  }
+  
+  private synchronized void updateResource(Resource clusterResource) {
     float queueLimit = clusterResource.getMemory() * absoluteCapacity; 
     setUtilization(usedResources.getMemory() / queueLimit);
     setUsedCapacity(
@@ -1138,22 +1274,36 @@ public class LeafQueue implements Queue 
 
   static class User {
     Resource consumed = Resources.createResource(0);
-    int applications = 0;
+    int pendingApplications = 0;
+    int activeApplications = 0;
 
     public Resource getConsumedResources() {
       return consumed;
     }
 
-    public int getApplications() {
-      return applications;
+    public int getPendingApplications() {
+      return pendingApplications;
     }
 
+    public int getActiveApplications() {
+      return activeApplications;
+    }
+
+    public int getTotalApplications() {
+      return getPendingApplications() + getActiveApplications();
+    }
+    
     public synchronized void submitApplication() {
-      ++applications;
+      ++pendingApplications;
+    }
+    
+    public synchronized void activateApplication() {
+      --pendingApplications;
+      ++activeApplications;
     }
 
     public synchronized void finishApplication() {
-      --applications;
+      --activeApplications;
     }
 
     public synchronized void assignContainer(Resource resource) {
@@ -1175,4 +1325,5 @@ public class LeafQueue implements Queue 
     parent.recoverContainer(clusterResource, application, container);
 
   }
+  
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1165404&r1=1165403&r2=1165404&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
Mon Sep  5 19:54:46 2011
@@ -646,7 +646,14 @@ public class ParentQueue implements Queu
   }
 
   @Override
-  public synchronized void updateResource(Resource clusterResource) {
+  public synchronized void updateClusterResource(Resource clusterResource) {
+    // Update all children
+    for (Queue childQueue : childQueues) {
+      childQueue.updateClusterResource(clusterResource);
+    }
+  }
+  
+  private synchronized void updateResource(Resource clusterResource) {
     float queueLimit = clusterResource.getMemory() * absoluteCapacity; 
     setUtilization(usedResources.getMemory() / queueLimit);
     setUsedCapacity(

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1165404&r1=1165403&r2=1165404&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
Mon Sep  5 19:54:46 2011
@@ -190,7 +190,7 @@ extends org.apache.hadoop.yarn.server.re
    * Update the cluster resource for queues as we add/remove nodes
    * @param clusterResource the current cluster resource
    */
-  public void updateResource(Resource clusterResource);
+  public void updateClusterResource(Resource clusterResource);
   
   /**
    * Recover the state of the queue

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml?rev=1165404&r1=1165403&r2=1165404&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml
Mon Sep  5 19:54:46 2011
@@ -6,6 +6,11 @@
   </property>
 
   <property>
+    <name>yarn.capacity-scheduler.maximum-am-resource-percent</name>
+    <value>0.1</value>
+  </property>
+
+  <property>
     <name>yarn.capacity-scheduler.root.queues</name>
     <value>default</value>
   </property>

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1165404&r1=1165403&r2=1165404&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
Mon Sep  5 19:54:46 2011
@@ -83,8 +83,12 @@ public class TestLeafQueue {
     
     csContext = mock(CapacitySchedulerContext.class);
     when(csContext.getConfiguration()).thenReturn(csConf);
-    when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB));
-    when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB));
+    when(csContext.getMinimumResourceCapability()).
+        thenReturn(Resources.createResource(GB));
+    when(csContext.getMaximumResourceCapability()).
+        thenReturn(Resources.createResource(16*GB));
+    when(csContext.getClusterResources()).
+        thenReturn(Resources.createResource(100 * 16 * GB));
     root = 
         CapacityScheduler.parseQueue(csContext, csConf, null, "root", 
             queues, queues, 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java?rev=1165404&r1=1165403&r2=1165404&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
Mon Sep  5 19:54:46 2011
@@ -60,6 +60,8 @@ public class TestParentQueue {
         Resources.createResource(GB));
     when(csContext.getMaximumResourceCapability()).thenReturn(
         Resources.createResource(16*GB));
+    when(csContext.getClusterResources()).
+        thenReturn(Resources.createResource(100 * 16 * GB));
   }
   
   private static final String A = "a";

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java?rev=1165404&r1=1165403&r2=1165404&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
Mon Sep  5 19:54:46 2011
@@ -116,6 +116,13 @@ public class TestUtils {
     return request;
   }
   
+  public static ApplicationId getMockApplicationId(int appId) {
+    ApplicationId applicationId = mock(ApplicationId.class);
+    when(applicationId.getClusterTimestamp()).thenReturn(0L);
+    when(applicationId.getId()).thenReturn(appId);
+    return applicationId;
+  }
+  
   public static ApplicationAttemptId 
   getMockApplicationAttemptId(int appId, int attemptId) {
     ApplicationId applicationId = mock(ApplicationId.class);



Mime
View raw message