Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 508BBD286 for ; Wed, 24 Oct 2012 13:21:58 +0000 (UTC) Received: (qmail 75557 invoked by uid 500); 24 Oct 2012 13:21:58 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 75503 invoked by uid 500); 24 Oct 2012 13:21:57 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 75466 invoked by uid 99); 24 Oct 2012 13:21:55 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Oct 2012 13:21:55 +0000 X-ASF-Spam-Status: No, hits=-1996.5 required=5.0 tests=ALL_TRUSTED,FILL_THIS_FORM,FILL_THIS_FORM_LONG X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Oct 2012 13:21:53 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 90AF923888FD; Wed, 24 Oct 2012 13:21:10 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1401668 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ hadoop-yarn/hadoop-yarn-server/hado... Date: Wed, 24 Oct 2012 13:21:10 -0000 To: yarn-commits@hadoop.apache.org From: tgraves@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121024132110.90AF923888FD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tgraves Date: Wed Oct 24 13:21:09 2012 New Revision: 1401668 URL: http://svn.apache.org/viewvc?rev=1401668&view=rev Log: YARN-177. CapacityScheduler - adding a queue while the RM is running has wacky results (acmurthy vai tgraves) Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1401668&r1=1401667&r2=1401668&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Oct 24 13:21:09 2012 @@ -174,6 +174,9 @@ Release 0.23.5 - UNRELEASED YARN-174. Modify NodeManager to pass the user's configuration even when rebooting. (vinodkv) + YARN-177. CapacityScheduler - adding a queue while the RM is running has + wacky results (acmurthy vai tgraves) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1401668&r1=1401667&r2=1401668&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Wed Oct 24 13:21:09 2012 @@ -51,6 +51,12 @@ extends org.apache.hadoop.yarn.server.re public CSQueue getParent(); /** + * Set the parent Queue. + * @param newParentQueue new parent queue + */ + public void setParent(CSQueue newParentQueue); + + /** * Get the queue name. * @return the queue name */ @@ -195,10 +201,10 @@ extends org.apache.hadoop.yarn.server.re /** * Reinitialize the queue. - * @param queue new queue to re-initalize from + * @param newlyParsedQueue new queue to re-initalize from * @param clusterResource resources in the cluster */ - public void reinitialize(CSQueue queue, Resource clusterResource) + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException; /** Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1401668&r1=1401667&r2=1401668&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Wed Oct 24 13:21:09 2012 @@ -223,7 +223,7 @@ public class LeafQueue implements CSQueu { // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); - float absCapacity = parent.getAbsoluteCapacity() * capacity; + float absCapacity = getParent().getAbsoluteCapacity() * capacity; CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absCapacity, absoluteMaxCapacity); this.capacity = capacity; @@ -256,7 +256,7 @@ public class LeafQueue implements CSQueu // Update metrics CSQueueUtils.updateQueueStatistics( - this, parent, clusterResource, minimumAllocation); + this, getParent(), clusterResource, minimumAllocation); LOG.info("Initializing " + queueName + "\n" + "capacity = " + capacity + @@ -339,10 +339,15 @@ public class LeafQueue implements CSQueu } @Override - public CSQueue getParent() { + public synchronized CSQueue getParent() { return parent; } - + + @Override + public synchronized void setParent(CSQueue newParentQueue) { + this.parent = (ParentQueue)newParentQueue; + } + @Override public String getQueueName() { return queueName; @@ -350,7 +355,7 @@ public class LeafQueue implements CSQueu @Override public String getQueuePath() { - return parent.getQueuePath() + "." + getQueueName(); + return getParent().getQueuePath() + "." + getQueueName(); } /** @@ -430,7 +435,9 @@ public class LeafQueue implements CSQueu synchronized void setMaxCapacity(float maximumCapacity) { // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); - float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent); + float absMaxCapacity = + CSQueueUtils.computeAbsoluteMaximumCapacity( + maximumCapacity, getParent()); CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity); this.maximumCapacity = maximumCapacity; @@ -453,10 +460,6 @@ public class LeafQueue implements CSQueu this.userLimitFactor = userLimitFactor; } - synchronized void setParentQueue(CSQueue parent) { - this.parent = parent; - } - @Override public synchronized int getNumApplications() { return getNumPendingApplications() + getNumActiveApplications(); @@ -559,26 +562,28 @@ public class LeafQueue implements CSQueu } @Override - public synchronized void reinitialize(CSQueue queue, Resource clusterResource) + public synchronized void reinitialize( + CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { // Sanity check - if (!(queue instanceof LeafQueue) || - !queue.getQueuePath().equals(getQueuePath())) { + if (!(newlyParsedQueue instanceof LeafQueue) || + !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { throw new IOException("Trying to reinitialize " + getQueuePath() + - " from " + queue.getQueuePath()); + " from " + newlyParsedQueue.getQueuePath()); } - LeafQueue leafQueue = (LeafQueue)queue; + LeafQueue newlyParsedLeafQueue = (LeafQueue)newlyParsedQueue; setupQueueConfigs( clusterResource, - leafQueue.capacity, leafQueue.absoluteCapacity, - leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity, - leafQueue.userLimit, leafQueue.userLimitFactor, - leafQueue.maxApplications, - leafQueue.getMaxApplicationsPerUser(), - leafQueue.getMaximumActiveApplications(), - leafQueue.getMaximumActiveApplicationsPerUser(), - leafQueue.state, leafQueue.acls); + newlyParsedLeafQueue.capacity, newlyParsedLeafQueue.absoluteCapacity, + newlyParsedLeafQueue.maximumCapacity, + newlyParsedLeafQueue.absoluteMaxCapacity, + newlyParsedLeafQueue.userLimit, newlyParsedLeafQueue.userLimitFactor, + newlyParsedLeafQueue.maxApplications, + newlyParsedLeafQueue.getMaxApplicationsPerUser(), + newlyParsedLeafQueue.getMaximumActiveApplications(), + newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(), + newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls); } @Override @@ -591,7 +596,7 @@ public class LeafQueue implements CSQueu } // Check if parent-queue allows access - return parent.hasAccess(acl, user); + return getParent().hasAccess(acl, user); } @Override @@ -649,10 +654,10 @@ public class LeafQueue implements CSQueu // Inform the parent queue try { - parent.submitApplication(application, userName, queue); + getParent().submitApplication(application, userName, queue); } catch (AccessControlException ace) { LOG.info("Failed to submit application to parent-queue: " + - parent.getQueuePath(), ace); + getParent().getQueuePath(), ace); removeApplication(application, user); throw ace; } @@ -708,7 +713,7 @@ public class LeafQueue implements CSQueu } // Inform the parent queue - parent.finishApplication(application, queue); + getParent().finishApplication(application, queue); } public synchronized void removeApplication(FiCaSchedulerApp application, User user) { @@ -1351,7 +1356,7 @@ public class LeafQueue implements CSQueu } // Inform the parent queue - parent.completedContainer(clusterResource, application, + getParent().completedContainer(clusterResource, application, node, rmContainer, null, event); } } @@ -1361,7 +1366,7 @@ public class LeafQueue implements CSQueu // Update queue metrics Resources.addTo(usedResources, resource); CSQueueUtils.updateQueueStatistics( - this, parent, clusterResource, minimumAllocation); + this, getParent(), clusterResource, minimumAllocation); ++numContainers; // Update user metrics @@ -1386,7 +1391,7 @@ public class LeafQueue implements CSQueu // Update queue metrics Resources.subtractFrom(usedResources, resource); CSQueueUtils.updateQueueStatistics( - this, parent, clusterResource, minimumAllocation); + this, getParent(), clusterResource, minimumAllocation); --numContainers; // Update user metrics @@ -1417,7 +1422,7 @@ public class LeafQueue implements CSQueu // Update metrics CSQueueUtils.updateQueueStatistics( - this, parent, clusterResource, minimumAllocation); + this, getParent(), clusterResource, minimumAllocation); // Update application properties for (FiCaSchedulerApp application : activeApplications) { @@ -1488,7 +1493,7 @@ public class LeafQueue implements CSQueu synchronized (this) { allocateResource(clusterResource, application, container.getResource()); } - parent.recoverContainer(clusterResource, application, container); + getParent().recoverContainer(clusterResource, application, container); } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1401668&r1=1401667&r2=1401668&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Wed Oct 24 13:21:09 2012 @@ -60,7 +60,7 @@ public class ParentQueue implements CSQu private static final Log LOG = LogFactory.getLog(ParentQueue.class); - private final CSQueue parent; + private CSQueue parent; private final String queueName; private float capacity; @@ -216,11 +216,16 @@ public class ParentQueue implements CSQu } @Override - public CSQueue getParent() { + public synchronized CSQueue getParent() { return parent; } @Override + public synchronized void setParent(CSQueue newParentQueue) { + this.parent = (ParentQueue)newParentQueue; + } + + @Override public String getQueueName() { return queueName; } @@ -357,37 +362,52 @@ public class ParentQueue implements CSQu } @Override - public synchronized void reinitialize(CSQueue queue, Resource clusterResource) + public synchronized void reinitialize( + CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { // Sanity check - if (!(queue instanceof ParentQueue) || - !queue.getQueuePath().equals(getQueuePath())) { + if (!(newlyParsedQueue instanceof ParentQueue) || + !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { throw new IOException("Trying to reinitialize " + getQueuePath() + - " from " + queue.getQueuePath()); + " from " + newlyParsedQueue.getQueuePath()); } - ParentQueue parentQueue = (ParentQueue)queue; + ParentQueue newlyParsedParentQueue = (ParentQueue)newlyParsedQueue; // Set new configs setupQueueConfigs(clusterResource, - parentQueue.capacity, parentQueue.absoluteCapacity, - parentQueue.maximumCapacity, parentQueue.absoluteMaxCapacity, - parentQueue.state, parentQueue.acls); + newlyParsedParentQueue.capacity, + newlyParsedParentQueue.absoluteCapacity, + newlyParsedParentQueue.maximumCapacity, + newlyParsedParentQueue.absoluteMaxCapacity, + newlyParsedParentQueue.state, + newlyParsedParentQueue.acls); // Re-configure existing child queues and add new ones // The CS has already checked to ensure all existing child queues are present! Map currentChildQueues = getQueues(childQueues); - Map newChildQueues = getQueues(parentQueue.childQueues); + Map newChildQueues = + getQueues(newlyParsedParentQueue.childQueues); for (Map.Entry e : newChildQueues.entrySet()) { String newChildQueueName = e.getKey(); CSQueue newChildQueue = e.getValue(); CSQueue childQueue = currentChildQueues.get(newChildQueueName); - if (childQueue != null){ + + // Check if the child-queue already exists + if (childQueue != null) { + // Re-init existing child queues childQueue.reinitialize(newChildQueue, clusterResource); LOG.info(getQueueName() + ": re-configured queue: " + childQueue); } else { + // New child queue, do not re-init + + // Set parent to 'this' + newChildQueue.setParent(this); + + // Save in list of current child queues currentChildQueues.put(newChildQueueName, newChildQueue); + LOG.info(getQueueName() + ": added new child queue: " + newChildQueue); } } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1401668&r1=1401667&r2=1401668&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Wed Oct 24 13:21:09 2012 @@ -378,4 +378,43 @@ public class TestCapacityScheduler { Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory()); } + + @Test + public void testRefreshQueuesWithNewQueue() throws Exception { + CapacityScheduler cs = new CapacityScheduler(); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + cs.setConf(new YarnConfiguration()); + cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new ClientToAMTokenSecretManagerInRM())); + checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + + // Add a new queue b4 + String B4 = B + ".b4"; + float B4_CAPACITY = 10; + + B3_CAPACITY -= B4_CAPACITY; + try { + conf.setCapacity(A, 80f); + conf.setCapacity(B, 20f); + conf.setQueues(B, new String[] {"b1", "b2", "b3", "b4"}); + conf.setCapacity(B1, B1_CAPACITY); + conf.setCapacity(B2, B2_CAPACITY); + conf.setCapacity(B3, B3_CAPACITY); + conf.setCapacity(B4, B4_CAPACITY); + cs.reinitialize(conf,null); + checkQueueCapacities(cs, 80f, 20f); + + // Verify parent for B4 + CSQueue rootQueue = cs.getRootQueue(); + CSQueue queueB = findQueue(rootQueue, B); + CSQueue queueB4 = findQueue(queueB, B4); + + assertEquals(queueB, queueB4.getParent()); + } finally { + B3_CAPACITY += B4_CAPACITY; + } + } + }