Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E79DC200B45 for ; Fri, 15 Jul 2016 20:46:29 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E6379160A61; Fri, 15 Jul 2016 18:46:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E0ED2160A57 for ; Fri, 15 Jul 2016 20:46:28 +0200 (CEST) Received: (qmail 480 invoked by uid 500); 15 Jul 2016 18:46:28 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 469 invoked by uid 99); 15 Jul 2016 18:46:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Jul 2016 18:46:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D392EE0A3F; Fri, 15 Jul 2016 18:46:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wangda@apache.org To: common-commits@hadoop.apache.org Message-Id: <5024bc70b2944031a5ccccc4d0ca3891@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-4484. Available Resource calculation for a queue is not correct when used with labels. (Sunil G via wangda) Date: Fri, 15 Jul 2016 18:46:27 +0000 (UTC) archived-at: Fri, 15 Jul 2016 18:46:30 -0000 Repository: hadoop Updated Branches: refs/heads/branch-2.8 83126ac90 -> 343633a6e YARN-4484. Available Resource calculation for a queue is not correct when used with labels. (Sunil G via wangda) (cherry picked from commit 24db9167f16ba643a186624b33a6b9b80020f476) (cherry picked from commit e34e1aa4fe0a0826439227175fc3321f840dddd4) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/343633a6 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/343633a6 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/343633a6 Branch: refs/heads/branch-2.8 Commit: 343633a6e837ba58ec0064906ecd4454811c1673 Parents: 83126ac Author: Wangda Tan Authored: Fri Jul 15 11:40:12 2016 -0700 Committer: Wangda Tan Committed: Fri Jul 15 11:46:17 2016 -0700 ---------------------------------------------------------------------- .../scheduler/capacity/CSQueueUtils.java | 55 ++--- .../TestNodeLabelContainerAllocation.java | 212 +++++++++++++++++++ 2 files changed, 242 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/343633a6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index c402784..5e9a5af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -214,24 +214,34 @@ class CSQueueUtils { queueCapacities.setUsedCapacity(nodePartition, usedCapacity); } - private static Resource getNonPartitionedMaxAvailableResourceToQueue( - final ResourceCalculator rc, Resource totalNonPartitionedResource, - CSQueue queue) { - Resource queueLimit = Resources.none(); - Resource usedResources = queue.getUsedResources(); + private static Resource getMaxAvailableResourceToQueue( + final ResourceCalculator rc, RMNodeLabelsManager nlm, CSQueue queue, + Resource cluster) { + Set nodeLabels = queue.getNodeLabelsForQueue(); + Resource totalAvailableResource = Resources.createResource(0, 0); - if (Resources.greaterThan(rc, totalNonPartitionedResource, - totalNonPartitionedResource, Resources.none())) { - queueLimit = - Resources.multiply(totalNonPartitionedResource, - queue.getAbsoluteCapacity()); - } + for (String partition : nodeLabels) { + // Calculate guaranteed resource for a label in a queue by below logic. + // (total label resource) * (absolute capacity of label in that queue) + Resource queueGuranteedResource = Resources.multiply(nlm + .getResourceByLabel(partition, cluster), queue.getQueueCapacities() + .getAbsoluteCapacity(partition)); - Resource available = Resources.subtract(queueLimit, usedResources); - return Resources.max(rc, totalNonPartitionedResource, available, - Resources.none()); + // Available resource in queue for a specific label will be calculated as + // {(guaranteed resource for a label in a queue) - + // (resource usage of that label in the queue)} + // Finally accumulate this available resource to get total. + Resource available = (Resources.greaterThan(rc, cluster, + queueGuranteedResource, + queue.getQueueResourceUsage().getUsed(partition))) ? Resources + .componentwiseMax(Resources.subtractFrom(queueGuranteedResource, + queue.getQueueResourceUsage().getUsed(partition)), Resources + .none()) : Resources.none(); + Resources.addTo(totalAvailableResource, available); + } + return totalAvailableResource; } - + /** *

* Update Queue Statistics: @@ -264,15 +274,10 @@ class CSQueueUtils { updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster), minimumAllocation, queueResourceUsage, queueCapacities, nodePartition); } - - // Now in QueueMetrics, we only store available-resource-to-queue for - // default partition. - if (nodePartition == null - || nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { - childQueue.getMetrics().setAvailableResourcesToQueue( - getNonPartitionedMaxAvailableResourceToQueue(rc, - nlm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, cluster), - childQueue)); - } + + // Update queue metrics w.r.t node labels. In a generic way, we can + // calculate available resource from all labels in cluster. + childQueue.getMetrics().setAvailableResourcesToQueue( + getMaxAvailableResourceToQueue(rc, nlm, childQueue, cluster)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/343633a6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 626acd8..dc0a17d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.junit.Assert.assertEquals; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; @@ -1676,4 +1679,213 @@ public class TestNodeLabelContainerAllocation { checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(), cs.getApplicationAttempt(am1.getApplicationAttemptId())); } + + @Test + public void testQueueMetricsWithLabels() throws Exception { + /** + * Test case: have a following queue structure: + * + *

+     *            root
+     *         /      \
+     *        a        b
+     *        (x)     (x)
+     * 
+ * + * a/b can access x, both of them has max-capacity-on-x = 50 + * + * When doing non-exclusive allocation, app in a (or b) can use 100% of x + * resource. + */ + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration( + this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b" }); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + final String queueA = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(queueA, 25); + csConf.setAccessibleNodeLabels(queueA, toSet("x")); + csConf.setCapacityByLabel(queueA, "x", 50); + csConf.setMaximumCapacityByLabel(queueA, "x", 50); + final String queueB = CapacitySchedulerConfiguration.ROOT + ".b"; + csConf.setCapacity(queueB, 75); + csConf.setAccessibleNodeLabels(queueB, toSet("x")); + csConf.setCapacityByLabel(queueB, "x", 50); + csConf.setMaximumCapacityByLabel(queueB, "x", 50); + + // set node -> label + mgr.addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("x", false))); + mgr.addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("y", false))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y"))); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = y + // app1 -> a + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // app1 asks for 5 partition=x containers + am1.allocate("*", 1 * GB, 5, new ArrayList(), "x"); + // NM1 do 50 heartbeats + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + // app1 gets all resource in partition=x + Assert.assertEquals(5, schedulerNode1.getNumContainers()); + + SchedulerNodeReport reportNm1 = rm1.getResourceScheduler() + .getNodeReport(nm1.getNodeId()); + Assert.assertEquals(5 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(5 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + SchedulerNodeReport reportNm2 = rm1.getResourceScheduler() + .getNodeReport(nm2.getNodeId()); + Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize()); + Assert.assertEquals(10 * GB, + reportNm2.getAvailableResource().getMemorySize()); + + LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); + assertEquals(0 * GB, leafQueue.getMetrics().getAvailableMB()); + assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB()); + rm1.close(); + } + + @Test + public void testQueueMetricsWithLabelsOnDefaultLabelNode() throws Exception { + /** + * Test case: have a following queue structure: + * + *
+     *            root
+     *         /      \
+     *        a        b
+     *        (x)     (x)
+     * 
+ * + * a/b can access x, both of them has max-capacity-on-x = 50 + * + * When doing non-exclusive allocation, app in a (or b) can use 100% of x + * resource. + */ + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration( + this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b" }); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + final String queueA = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(queueA, 25); + csConf.setAccessibleNodeLabels(queueA, toSet("x")); + csConf.setCapacityByLabel(queueA, "x", 50); + csConf.setMaximumCapacityByLabel(queueA, "x", 50); + final String queueB = CapacitySchedulerConfiguration.ROOT + ".b"; + csConf.setCapacity(queueB, 75); + csConf.setAccessibleNodeLabels(queueB, toSet("x")); + csConf.setCapacityByLabel(queueB, "x", 50); + csConf.setMaximumCapacityByLabel(queueB, "x", 50); + + // set node -> label + mgr.addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("x", false))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = + // app1 -> a + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2); + + // app1 asks for 3 partition= containers + am1.allocate("*", 1 * GB, 3, new ArrayList()); + + // NM1 do 50 heartbeats + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + // app1 gets all resource in partition=x (non-exclusive) + Assert.assertEquals(3, schedulerNode1.getNumContainers()); + + SchedulerNodeReport reportNm1 = rm1.getResourceScheduler() + .getNodeReport(nm1.getNodeId()); + Assert.assertEquals(3 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(7 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + SchedulerNodeReport reportNm2 = rm1.getResourceScheduler() + .getNodeReport(nm2.getNodeId()); + Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize()); + Assert.assertEquals(9 * GB, + reportNm2.getAvailableResource().getMemorySize()); + + LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); + double delta = 0.0001; + // 3GB is used from label x quota. 1.5 GB is remaining from default label. + // 2GB is remaining from label x. + assertEquals(3.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(4 * GB, leafQueue.getMetrics().getAllocatedMB()); + + // app1 asks for 1 default partition container + am1.allocate("*", 1 * GB, 5, new ArrayList()); + + // NM2 do couple of heartbeats + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // app1 gets all resource in default partition + Assert.assertEquals(2, schedulerNode2.getNumContainers()); + + // 3GB is used from label x quota. 2GB used from default label. + // So total 2.5 GB is remaining. + assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB()); + + rm1.close(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org