Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 498D58B67 for ; Mon, 5 Sep 2011 19:50:13 +0000 (UTC) Received: (qmail 46905 invoked by uid 500); 5 Sep 2011 19:50:13 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 46801 invoked by uid 500); 5 Sep 2011 19:50:12 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 46793 invoked by uid 99); 5 Sep 2011 19:50:11 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Sep 2011 19:50:11 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Sep 2011 19:50:08 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A5909238889B; Mon, 5 Sep 2011 19:49:48 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1165403 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache... Date: Mon, 05 Sep 2011 19:49:47 -0000 To: mapreduce-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110905194948.A5909238889B@eris.apache.org> Author: acmurthy Date: Mon Sep 5 19:49:47 2011 New Revision: 1165403 URL: http://svn.apache.org/viewvc?rev=1165403&view=rev Log: MAPREDUCE-2697. Enhance CapacityScheduler to cap concurrently running applications per-queue & per-user. Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1165403&r1=1165402&r2=1165403&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Sep 5 19:49:47 2011 @@ -1,6 +1,7 @@ Hadoop MapReduce Change Log Trunk (unreleased changes) + IMPROVEMENTS MAPREDUCE-2887 due to HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia) @@ -236,6 +237,11 @@ Release 0.23.0 - Unreleased MAPREDUCE-2735. Add an applications summary log to ResourceManager. (Thomas Graves via acmurthy) + MAPREDUCE-2697. Enhance CapacityScheduler to cap concurrently running + applications per-queue & per-user. (acmurthy) + Configuration changes: + add yarn.capacity-scheduler.maximum-am-resource-percent + OPTIMIZATIONS MAPREDUCE-2026. Make JobTracker.getJobCounters() and Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java?rev=1165403&r1=1165402&r2=1165403&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java Mon Sep 5 19:49:47 2011 @@ -29,8 +29,6 @@ import org.apache.hadoop.yarn.proto.Yarn import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProtoOrBuilder; -import org.mortbay.log.Log; - public class ContainerIdPBImpl extends ProtoBase implements ContainerId { Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1165403&r1=1165402&r2=1165403&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Mon Sep 5 19:49:47 2011 @@ -167,6 +167,11 @@ implements ResourceScheduler, CapacitySc } @Override + public Resource getClusterResources() { + return clusterResource; + } + + @Override public synchronized void reinitialize(Configuration conf, ContainerTokenSecretManager containerTokenSecretManager, RMContext rmContext) throws IOException { @@ -621,6 +626,7 @@ implements ResourceScheduler, CapacitySc private synchronized void addNode(RMNode nodeManager) { this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager)); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); + root.updateClusterResource(clusterResource); ++numNodeManagers; LOG.info("Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); @@ -629,6 +635,7 @@ implements ResourceScheduler, CapacitySc private synchronized void removeNode(RMNode nodeInfo) { SchedulerNode node = this.nodes.get(nodeInfo.getNodeID()); Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability()); + root.updateClusterResource(clusterResource); --numNodeManagers; // Remove running containers Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1165403&r1=1165402&r2=1165403&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java Mon Sep 5 19:49:47 2011 @@ -50,6 +50,10 @@ public class CapacitySchedulerConfigurat PREFIX + "maximum-applications"; @Private + public static final String MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT = + PREFIX + "maximum-am-resource-percent"; + + @Private public static final String QUEUES = "queues"; @Private @@ -83,6 +87,10 @@ public class CapacitySchedulerConfigurat public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000; @Private + public static final float + DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT = 0.1f; + + @Private public static final int UNDEFINED = -1; @Private @@ -124,6 +132,11 @@ public class CapacitySchedulerConfigurat return maxApplications; } + public float getMaximumApplicationMasterResourcePercent() { + return getFloat(MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, + DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT); + } + public int getCapacity(String queue) { int capacity = getInt(getQueuePrefix(queue) + CAPACITY, UNDEFINED); if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) { Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java?rev=1165403&r1=1165402&r2=1165403&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java Mon Sep 5 19:49:47 2011 @@ -37,4 +37,6 @@ public interface CapacitySchedulerContex int getNumClusterNodes(); RMContext getRMContext(); + + Resource getClusterResources(); } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1165403&r1=1165402&r2=1165403&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Mon Sep 5 19:49:47 2011 @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -77,15 +78,22 @@ public class LeafQueue implements Queue private int maxApplications; private int maxApplicationsPerUser; + + private float maxAMResourcePercent; + private int maxActiveApplications; + private int maxActiveApplicationsPerUser; + private Resource usedResources = Resources.createResource(0); private float utilization = 0.0f; private float usedCapacity = 0.0f; private volatile int numContainers; - Set applications; + Set activeApplications; Map applicationsMap = new HashMap(); + Set pendingApplications; + private final Resource minimumAllocation; private final Resource maximumAllocation; private final float minimumAllocationFactor; @@ -108,6 +116,8 @@ public class LeafQueue implements Queue private CapacitySchedulerContext scheduler; + final static int DEFAULT_AM_RESOURCE = 2 * 1024; + public LeafQueue(CapacitySchedulerContext cs, String queueName, Queue parent, Comparator applicationComparator, Queue old) { @@ -144,6 +154,15 @@ public class LeafQueue implements Queue int maxApplicationsPerUser = (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor); + this.maxAMResourcePercent = + cs.getConfiguration().getMaximumApplicationMasterResourcePercent(); + int maxActiveApplications = + computeMaxActiveApplications(cs.getClusterResources(), + maxAMResourcePercent, absoluteCapacity); + int maxActiveApplicationsPerUser = + computeMaxActiveApplicationsPerUser(maxActiveApplications, userLimit, + userLimitFactor); + this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class); this.queueInfo.setQueueName(queueName); this.queueInfo.setChildQueues(new ArrayList()); @@ -157,20 +176,38 @@ public class LeafQueue implements Queue maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor, maxApplications, maxApplicationsPerUser, + maxActiveApplications, maxActiveApplicationsPerUser, state, acls); LOG.info("DEBUG --- LeafQueue:" + " name=" + queueName + ", fullname=" + getQueuePath()); - this.applications = new TreeSet(applicationComparator); + this.pendingApplications = + new TreeSet(applicationComparator); + this.activeApplications = new TreeSet(applicationComparator); } + private int computeMaxActiveApplications(Resource clusterResource, + float maxAMResourcePercent, float absoluteCapacity) { + return + Math.max( + (int)((clusterResource.getMemory() / DEFAULT_AM_RESOURCE) * + maxAMResourcePercent * absoluteCapacity), + 1); + } + + private int computeMaxActiveApplicationsPerUser(int maxActiveApplications, + int userLimit, float userLimitFactor) { + return (int)(maxActiveApplications * (userLimit / 100.0f) * userLimitFactor); + } + private synchronized void setupQueueConfigs( float capacity, float absoluteCapacity, float maxCapacity, float absoluteMaxCapacity, int userLimit, float userLimitFactor, int maxApplications, int maxApplicationsPerUser, + int maxActiveApplications, int maxActiveApplicationsPerUser, QueueState state, Map acls) { this.capacity = capacity; @@ -185,6 +222,9 @@ public class LeafQueue implements Queue this.maxApplications = maxApplications; this.maxApplicationsPerUser = maxApplicationsPerUser; + this.maxActiveApplications = maxActiveApplications; + this.maxActiveApplicationsPerUser = maxActiveApplicationsPerUser; + this.state = state; this.acls = acls; @@ -269,6 +309,22 @@ public class LeafQueue implements Queue return minimumAllocationFactor; } + public int getMaxApplications() { + return maxApplications; + } + + public int getMaxApplicationsPerUser() { + return maxApplicationsPerUser; + } + + public int getMaximumActiveApplications() { + return maxActiveApplications; + } + + public int getMaximumActiveApplicationsPerUser() { + return maxActiveApplicationsPerUser; + } + @Override public synchronized float getUsedCapacity() { return usedCapacity; @@ -329,10 +385,34 @@ public class LeafQueue implements Queue this.parent = parent; } + @Override public synchronized int getNumApplications() { - return applications.size(); + return getNumPendingApplications() + getNumActiveApplications(); + } + + public synchronized int getNumPendingApplications() { + return pendingApplications.size(); + } + + public synchronized int getNumActiveApplications() { + return activeApplications.size(); + } + + @Private + public synchronized int getNumApplications(String user) { + return getUser(user).getTotalApplications(); + } + + @Private + public synchronized int getNumPendingApplications(String user) { + return getUser(user).getPendingApplications(); } + @Private + public synchronized int getNumActiveApplications(String user) { + return getUser(user).getActiveApplications(); + } + public synchronized int getNumContainers() { return numContainers; } @@ -342,6 +422,16 @@ public class LeafQueue implements Queue return state; } + @Private + public int getUserLimit() { + return userLimit; + } + + @Private + public float getUserLimitFactor() { + return userLimitFactor; + } + @Override public synchronized Map getQueueAcls() { return new HashMap(acls); @@ -404,6 +494,8 @@ public class LeafQueue implements Queue leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity, leafQueue.userLimit, leafQueue.userLimitFactor, leafQueue.maxApplications, leafQueue.maxApplicationsPerUser, + leafQueue.maxActiveApplications, + leafQueue.maxActiveApplicationsPerUser, leafQueue.state, leafQueue.acls); updateResource(clusterResource); @@ -443,7 +535,7 @@ public class LeafQueue implements Queue synchronized (this) { // Check if the queue is accepting jobs - if (state != QueueState.RUNNING) { + if (getState() != QueueState.RUNNING) { String msg = "Queue " + getQueuePath() + " is STOPPED. Cannot accept submission of application: " + application.getApplicationId(); @@ -452,7 +544,7 @@ public class LeafQueue implements Queue } // Check submission limits for queues - if (getNumApplications() >= maxApplications) { + if (getNumApplications() >= getMaxApplications()) { String msg = "Queue " + getQueuePath() + " already has " + getNumApplications() + " applications," + " cannot accept submission of application: " + @@ -463,9 +555,9 @@ public class LeafQueue implements Queue // Check submission limits for the user on this queue user = getUser(userName); - if (user.getApplications() >= maxApplicationsPerUser) { + if (user.getTotalApplications() >= getMaxApplicationsPerUser()) { String msg = "Queue " + getQueuePath() + - " already has " + user.getApplications() + + " already has " + user.getTotalApplications() + " applications from user " + userName + " cannot accept submission of application: " + application.getApplicationId(); @@ -490,17 +582,46 @@ public class LeafQueue implements Queue } } + private synchronized void activateApplications() { + for (Iterator i=pendingApplications.iterator(); + i.hasNext(); ) { + SchedulerApp application = i.next(); + + // Check queue limit + if (getNumActiveApplications() >= getMaximumActiveApplications()) { + break; + } + + // Check user limit + User user = getUser(application.getUser()); + if (user.getActiveApplications() < getMaximumActiveApplicationsPerUser()) { + user.activateApplication(); + activeApplications.add(application); + i.remove(); + LOG.info("Application " + application.getApplicationId().getId() + + " from user: " + application.getUser() + + " activated in queue: " + getQueueName()); + } + } + } + private synchronized void addApplication(SchedulerApp application, User user) { // Accept user.submitApplication(); - applications.add(application); + pendingApplications.add(application); applicationsMap.put(application.getApplicationAttemptId(), application); + // Activate applications + activateApplications(); + LOG.info("Application added -" + " appId: " + application.getApplicationId() + " user: " + user + "," + " leaf-queue: " + getQueueName() + - " #user-applications: " + user.getApplications() + - " #queue-applications: " + getNumApplications()); + " #user-pending-applications: " + user.getPendingApplications() + + " #user-active-applications: " + user.getActiveApplications() + + " #queue-pending-applications: " + getNumPendingApplications() + + " #queue-active-applications: " + getNumActiveApplications() + ); } @Override @@ -515,20 +636,26 @@ public class LeafQueue implements Queue } public synchronized void removeApplication(SchedulerApp application, User user) { - applications.remove(application); + activeApplications.remove(application); applicationsMap.remove(application.getApplicationAttemptId()); user.finishApplication(); - if (user.getApplications() == 0) { + if (user.getTotalApplications() == 0) { users.remove(application.getUser()); } + // Check if we can activate more applications + activateApplications(); + LOG.info("Application removed -" + " appId: " + application.getApplicationId() + " user: " + application.getUser() + " queue: " + getQueueName() + - " #user-applications: " + user.getApplications() + - " #queue-applications: " + getNumApplications()); + " #user-pending-applications: " + user.getPendingApplications() + + " #user-active-applications: " + user.getActiveApplications() + + " #queue-pending-applications: " + getNumPendingApplications() + + " #queue-active-applications: " + getNumActiveApplications() + ); } private synchronized SchedulerApp getApplication( @@ -542,7 +669,7 @@ public class LeafQueue implements Queue LOG.info("DEBUG --- assignContainers:" + " node=" + node.getHostName() + - " #applications=" + applications.size()); + " #applications=" + activeApplications.size()); // Check for reserved resources RMContainer reservedContainer = node.getReservedContainer(); @@ -554,7 +681,7 @@ public class LeafQueue implements Queue } // Try to assign containers to applications in order - for (SchedulerApp application : applications) { + for (SchedulerApp application : activeApplications) { LOG.info("DEBUG --- pre-assignContainers for application " + application.getApplicationId()); @@ -1119,7 +1246,16 @@ public class LeafQueue implements Queue } @Override - public synchronized void updateResource(Resource clusterResource) { + public synchronized void updateClusterResource(Resource clusterResource) { + maxActiveApplications = + computeMaxActiveApplications(clusterResource, maxAMResourcePercent, + absoluteCapacity); + maxActiveApplicationsPerUser = + computeMaxActiveApplicationsPerUser(maxActiveApplications, userLimit, + userLimitFactor); + } + + private synchronized void updateResource(Resource clusterResource) { float queueLimit = clusterResource.getMemory() * absoluteCapacity; setUtilization(usedResources.getMemory() / queueLimit); setUsedCapacity( @@ -1138,22 +1274,36 @@ public class LeafQueue implements Queue static class User { Resource consumed = Resources.createResource(0); - int applications = 0; + int pendingApplications = 0; + int activeApplications = 0; public Resource getConsumedResources() { return consumed; } - public int getApplications() { - return applications; + public int getPendingApplications() { + return pendingApplications; } + public int getActiveApplications() { + return activeApplications; + } + + public int getTotalApplications() { + return getPendingApplications() + getActiveApplications(); + } + public synchronized void submitApplication() { - ++applications; + ++pendingApplications; + } + + public synchronized void activateApplication() { + --pendingApplications; + ++activeApplications; } public synchronized void finishApplication() { - --applications; + --activeApplications; } public synchronized void assignContainer(Resource resource) { @@ -1175,4 +1325,5 @@ public class LeafQueue implements Queue parent.recoverContainer(clusterResource, application, container); } + } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1165403&r1=1165402&r2=1165403&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Mon Sep 5 19:49:47 2011 @@ -646,7 +646,14 @@ public class ParentQueue implements Queu } @Override - public synchronized void updateResource(Resource clusterResource) { + public synchronized void updateClusterResource(Resource clusterResource) { + // Update all children + for (Queue childQueue : childQueues) { + childQueue.updateClusterResource(clusterResource); + } + } + + private synchronized void updateResource(Resource clusterResource) { float queueLimit = clusterResource.getMemory() * absoluteCapacity; setUtilization(usedResources.getMemory() / queueLimit); setUsedCapacity( Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1165403&r1=1165402&r2=1165403&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java Mon Sep 5 19:49:47 2011 @@ -190,7 +190,7 @@ extends org.apache.hadoop.yarn.server.re * Update the cluster resource for queues as we add/remove nodes * @param clusterResource the current cluster resource */ - public void updateResource(Resource clusterResource); + public void updateClusterResource(Resource clusterResource); /** * Recover the state of the queue Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml?rev=1165403&r1=1165402&r2=1165403&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml Mon Sep 5 19:49:47 2011 @@ -6,6 +6,11 @@ + yarn.capacity-scheduler.maximum-am-resource-percent + 0.1 + + + yarn.capacity-scheduler.root.queues default Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java?rev=1165403&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java Mon Sep 5 19:49:47 2011 @@ -0,0 +1,234 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestApplicationLimits { + + private static final Log LOG = LogFactory.getLog(TestApplicationLimits.class); + final static int GB = 1024; + + LeafQueue queue; + + @Before + public void setUp() { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + + + CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB)); + when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB)); + when(csContext.getClusterResources()).thenReturn(Resources.createResource(10 * 16 * GB)); + + Map queues = new HashMap(); + Queue root = + CapacityScheduler.parseQueue(csContext, csConf, null, "root", + queues, queues, + CapacityScheduler.queueComparator, + CapacityScheduler.applicationComparator, + TestUtils.spyHook); + + + queue = spy( + new LeafQueue(csContext, A, root, + CapacityScheduler.applicationComparator, null) + ); + + // Stub out ACL checks + doReturn(true). + when(queue).hasAccess(any(QueueACL.class), + any(UserGroupInformation.class)); + + // Some default values + doReturn(100).when(queue).getMaxApplications(); + doReturn(25).when(queue).getMaxApplicationsPerUser(); + doReturn(10).when(queue).getMaximumActiveApplications(); + doReturn(2).when(queue).getMaximumActiveApplicationsPerUser(); + } + + private static final String A = "a"; + private static final String B = "b"; + private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacityScheduler.ROOT, new String[] {A, B}); + conf.setCapacity(CapacityScheduler.ROOT, 100); + + final String Q_A = CapacityScheduler.ROOT + "." + A; + conf.setCapacity(Q_A, 10); + + final String Q_B = CapacityScheduler.ROOT + "." + B; + conf.setCapacity(Q_B, 90); + + LOG.info("Setup top-level queues a and b"); + } + + private SchedulerApp getMockApplication(int appId, String user) { + SchedulerApp application = mock(SchedulerApp.class); + ApplicationAttemptId applicationAttemptId = + TestUtils.getMockApplicationAttemptId(appId, 0); + doReturn(applicationAttemptId.getApplicationId()). + when(application).getApplicationId(); + doReturn(applicationAttemptId). when(application).getApplicationAttemptId(); + doReturn(user).when(application).getUser(); + return application; + } + + @Test + public void testLimitsComputation() throws Exception { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + + + CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB)); + when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB)); + + // Say cluster has 100 nodes of 16G each + Resource clusterResource = Resources.createResource(100 * 16 * GB); + when(csContext.getClusterResources()).thenReturn(clusterResource); + + Map queues = new HashMap(); + Queue root = + CapacityScheduler.parseQueue(csContext, csConf, null, "root", + queues, queues, + CapacityScheduler.queueComparator, + CapacityScheduler.applicationComparator, + TestUtils.spyHook); + + LeafQueue queue = (LeafQueue)queues.get(A); + + LOG.info("Queue 'A' -" + + " maxActiveApplications=" + queue.getMaximumActiveApplications() + + " maxActiveApplicationsPerUser=" + + queue.getMaximumActiveApplicationsPerUser()); + int expectedMaxActiveApps = + Math.max(1, + (int)((clusterResource.getMemory() / LeafQueue.DEFAULT_AM_RESOURCE) * + csConf.getMaximumApplicationMasterResourcePercent() * + queue.getAbsoluteCapacity())); + assertEquals(expectedMaxActiveApps, + queue.getMaximumActiveApplications()); + assertEquals((int)(expectedMaxActiveApps * (queue.getUserLimit() / 100.0f) * + queue.getUserLimitFactor()), + queue.getMaximumActiveApplicationsPerUser()); + + // Add some nodes to the cluster & test new limits + clusterResource = Resources.createResource(120 * 16 * GB); + root.updateClusterResource(clusterResource); + expectedMaxActiveApps = + Math.max(1, + (int)((clusterResource.getMemory() / LeafQueue.DEFAULT_AM_RESOURCE) * + csConf.getMaximumApplicationMasterResourcePercent() * + queue.getAbsoluteCapacity())); + assertEquals(expectedMaxActiveApps, + queue.getMaximumActiveApplications()); + assertEquals((int)(expectedMaxActiveApps * (queue.getUserLimit() / 100.0f) * + queue.getUserLimitFactor()), + queue.getMaximumActiveApplicationsPerUser()); + + } + + @Test + public void testActiveApplicationLimits() throws Exception { + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + int APPLICATION_ID = 0; + // Submit first application + SchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); + queue.submitApplication(app_0, user_0, A); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + + // Submit second application + SchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); + queue.submitApplication(app_1, user_0, A); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + + // Submit third application, should remain pending + SchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); + queue.submitApplication(app_2, user_0, A); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + + // Finish one application, app_2 should be activated + queue.finishApplication(app_0, A); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + + // Submit another one for user_0 + SchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); + queue.submitApplication(app_3, user_0, A); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + + // Change queue limit to be smaller so 2 users can fill it up + doReturn(3).when(queue).getMaximumActiveApplications(); + + // Submit first app for user_1 + SchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1); + queue.submitApplication(app_4, user_1, A); + assertEquals(3, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + assertEquals(1, queue.getNumActiveApplications(user_1)); + assertEquals(0, queue.getNumPendingApplications(user_1)); + + // Submit second app for user_1, should block due to queue-limit + SchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1); + queue.submitApplication(app_5, user_1, A); + assertEquals(3, queue.getNumActiveApplications()); + assertEquals(2, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + assertEquals(1, queue.getNumActiveApplications(user_1)); + assertEquals(1, queue.getNumPendingApplications(user_1)); + + // Now finish one app of user_1 so app_5 should be activated + queue.finishApplication(app_4, A); + assertEquals(3, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + assertEquals(1, queue.getNumActiveApplications(user_1)); + assertEquals(0, queue.getNumPendingApplications(user_1)); + } + + @After + public void tearDown() { + + } +} Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1165403&r1=1165402&r2=1165403&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Mon Sep 5 19:49:47 2011 @@ -83,8 +83,12 @@ public class TestLeafQueue { csContext = mock(CapacitySchedulerContext.class); when(csContext.getConfiguration()).thenReturn(csConf); - when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB)); - when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB)); + when(csContext.getMinimumResourceCapability()). + thenReturn(Resources.createResource(GB)); + when(csContext.getMaximumResourceCapability()). + thenReturn(Resources.createResource(16*GB)); + when(csContext.getClusterResources()). + thenReturn(Resources.createResource(100 * 16 * GB)); root = CapacityScheduler.parseQueue(csContext, csConf, null, "root", queues, queues, Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java?rev=1165403&r1=1165402&r2=1165403&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java Mon Sep 5 19:49:47 2011 @@ -60,6 +60,8 @@ public class TestParentQueue { Resources.createResource(GB)); when(csContext.getMaximumResourceCapability()).thenReturn( Resources.createResource(16*GB)); + when(csContext.getClusterResources()). + thenReturn(Resources.createResource(100 * 16 * GB)); } private static final String A = "a"; Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java?rev=1165403&r1=1165402&r2=1165403&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java Mon Sep 5 19:49:47 2011 @@ -116,6 +116,13 @@ public class TestUtils { return request; } + public static ApplicationId getMockApplicationId(int appId) { + ApplicationId applicationId = mock(ApplicationId.class); + when(applicationId.getClusterTimestamp()).thenReturn(0L); + when(applicationId.getId()).thenReturn(appId); + return applicationId; + } + public static ApplicationAttemptId getMockApplicationAttemptId(int appId, int attemptId) { ApplicationId applicationId = mock(ApplicationId.class);