Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 303EC200D61 for ; Tue, 5 Dec 2017 01:31:26 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 2ED8F160BF9; Tue, 5 Dec 2017 00:31:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 28148160C05 for ; Tue, 5 Dec 2017 01:31:25 +0100 (CET) Received: (qmail 36025 invoked by uid 500); 5 Dec 2017 00:31:24 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 36016 invoked by uid 99); 5 Dec 2017 00:31:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Dec 2017 00:31:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B090AE2F03; Tue, 5 Dec 2017 00:31:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: junping_du@apache.org To: common-commits@hadoop.apache.org Date: Tue, 05 Dec 2017 00:31:24 -0000 Message-Id: In-Reply-To: <624dc2bdd4704a56a5b730b2f689dfc5@git.apache.org> References: <624dc2bdd4704a56a5b730b2f689dfc5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hadoop git commit: YARN-7496. CS Intra-queue preemption user-limit calculations are not in line with LeafQueue user-limit calculations. (Eric Payne via wangda) archived-at: Tue, 05 Dec 2017 00:31:26 -0000 YARN-7496. CS Intra-queue preemption user-limit calculations are not in line with LeafQueue user-limit calculations. (Eric Payne via wangda) Change-Id: I4dada78a227408a1f2d9bc18099041aad81d67d7 (cherry picked from commit dd471425652f670786bafa03537526c19180d5ee) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3bc0d0c3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3bc0d0c3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3bc0d0c3 Branch: refs/heads/branch-2.8.3 Commit: 3bc0d0c38911dd24fc39beb748099b6259f489e3 Parents: 370e08c Author: Wangda Tan Authored: Thu Nov 23 20:18:22 2017 -0800 Committer: Junping Du Committed: Mon Dec 4 16:26:04 2017 -0800 ---------------------------------------------------------------------- .../scheduler/capacity/LeafQueue.java | 37 +++----- ...cityPreemptionPolicyIntraQueueUserLimit.java | 42 +++++++++ .../scheduler/capacity/TestLeafQueue.java | 97 ++++++++++++++++++++ 3 files changed, 152 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bc0d0c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 40b7e0e..3915b1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -142,7 +142,6 @@ public class LeafQueue extends AbstractCSQueue { private Set activeUsersSet = Collections.newSetFromMap(new ConcurrentHashMap()); private float activeUsersTimesWeights = 0.0f; - private float allUsersTimesWeights = 0.0f; @SuppressWarnings({ "unchecked", "rawtypes" }) public LeafQueue(CapacitySchedulerContext cs, @@ -307,7 +306,6 @@ public class LeafQueue extends AbstractCSQueue { ue.getValue().setWeight(getUserWeightFromQueue(ue.getKey())); } activeUsersTimesWeights = sumActiveUsersTimesWeights(); - allUsersTimesWeights = sumAllUsersTimesWeights(); } /** @@ -450,7 +448,6 @@ public class LeafQueue extends AbstractCSQueue { user = new User(userName); users.put(userName, user); user.setWeight(getUserWeightFromQueue(userName)); - allUsersTimesWeights = sumAllUsersTimesWeights(); } return user; } @@ -871,7 +868,6 @@ public class LeafQueue extends AbstractCSQueue { user.finishApplication(wasActive); if (user.getTotalApplications() == 0) { users.remove(application.getUser()); - allUsersTimesWeights = sumAllUsersTimesWeights(); } // Check if we can activate more applications @@ -1298,18 +1294,20 @@ public class LeafQueue extends AbstractCSQueue { // Also, the queue's configured capacity should be higher than // queue-hard-limit * ulMin - float usersSummedByWeight; - if (forActive) { - if (activeUsersManager.getActiveUsersChanged()) { - activeUsersSet = activeUsersManager.getActiveUsersSet(); - activeUsersTimesWeights = sumActiveUsersTimesWeights(); - activeUsersManager.clearActiveUsersChanged(); - } - usersSummedByWeight = activeUsersTimesWeights; - } else { - usersSummedByWeight = allUsersTimesWeights; + if (activeUsersManager.getActiveUsersChanged()) { + activeUsersSet = activeUsersManager.getActiveUsersSet(); + activeUsersTimesWeights = sumActiveUsersTimesWeights(); + activeUsersManager.clearActiveUsersChanged(); } - + float usersSummedByWeight = activeUsersTimesWeights; + + // Align the preemption algorithm with the assignment algorithm. + // If calculating for preemption and the user is not active, calculate the + // limit as if the user will be preempted (since that will make it active). + if (!forActive && !activeUsersSet.contains(userName)) { + usersSummedByWeight = activeUsersTimesWeights + user.getWeight(); + } + // User limit resource is determined by: // max(currentCapacity / #activeUsers, currentCapacity * // user-limit-percentage%) @@ -1394,15 +1392,6 @@ public class LeafQueue extends AbstractCSQueue { } return count; } - - synchronized float sumAllUsersTimesWeights() { - float count = 0.0f; - for (String userName : users.keySet()) { - User user = getUser(userName); - count += user.getWeight(); - } - return count; - } @Private protected synchronized boolean canAssignToUser(Resource clusterResource, http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bc0d0c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java index 0440db3..006c3f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java @@ -931,4 +931,46 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(1)))); } + + @Test + public void testSimpleIUserLimitntraQueuePreemptionNoOverPreemption() + throws IOException { + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + + // When preempting within a queue, it should preempt no more than what the + // LeafQueue will then re-assign. In this use case, containers will be + // preempted from app1, which will cause app1 and app2 to be active. + // LeafQueue will calculate MULP to be 50% because + // (100 resources / 2 active users) = 50. We expect 14 containers to be + // preempted. + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + // \tMULP + String appsConfig = + "a\t" // app1 in a + + "(1,1,n1,,65,false,0,user1)\t50;" + + "a\t" // app2 in a + + "(1,1,n1,,35,false,0,user2)\t50;" + + "a\t" // app3 in a + + "(1,1,n1,,0,false,35,user3)\t50" + ; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2 is right at its user limit and app1 needs one resource. Should + // preempt 1 container. + verify(mDisp, times(14)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bc0d0c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index e4fe03d..23fb29d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -3503,4 +3503,101 @@ public class TestLeafQueue { assertEquals(1*GB, app_1.getTotalPendingRequestsPerPartition().get("").getMemorySize()); } + + @Test + public void testGetResourceLimitForAllUsers() throws Exception { + // Mock the queue + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + // Set minimum-user-limit-percent for queue "a" so 3 users can be active. + csConf.setUserLimit(a.getQueuePath(), 33); + // Make sure a single user can consume the entire cluster. + csConf.setUserLimitFactor(a.getQueuePath(), 15); + csConf.setMaximumCapacity(a.getQueuePath(), 100); + + when(csContext.getClusterResource()) + .thenReturn(Resources.createResource(100 * GB, 192)); + a.reinitialize(a, csContext.getClusterResource()); + + // Users + final String user_0 = "user_0"; + final String user_1 = "user_1"; + final String user_2 = "user_2"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, + a.getActiveUsersManager(), spyRMContext); + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_1, a, + a.getActiveUsersManager(), spyRMContext); + a.submitApplicationAttempt(app_1, user_1); // different user + + final ApplicationAttemptId appAttemptId_2 = + TestUtils.getMockApplicationAttemptId(2, 0); + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_2, a, + a.getActiveUsersManager(), spyRMContext); + a.submitApplicationAttempt(app_2, user_2); // different user + + // Setup some nodes + String host_0 = "127.0.0.1"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 100*GB); + + final int numNodes = 1; + Resource clusterResource = + Resources.createResource(numNodes * (100*GB), numNodes * 128); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // user_0 consumes 65% of the queue + Priority priority = TestUtils.createMockPriority(1); + for (int i=0; i < 65; i++) { + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, + priority, recordFactory))); + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + } + assertEquals(65*GB, app_0.getCurrentConsumption().getMemorySize()); + + // When the minimum user limit percent is set to 33%, the capacity scheduler + // will try to assign 35% of resources to each user. This is because the + // capacity scheduler leaves a slight buffer for each user. + for (int i=0; i < 35; i++) { + app_1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, + priority, recordFactory))); + + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + } + assertEquals(35*GB, app_1.getCurrentConsumption().getMemorySize()); + assertEquals(0, a.getActiveUsersManager().getNumActiveUsers()); + + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 35, true, + priority, recordFactory))); + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize()); + assertEquals(1, a.getActiveUsersManager().getNumActiveUsers()); + + // With one active user requesting resources (user_2), one user exactly at + // the user limit guarantee (user_1) and one user over its user limit + // (uesr_0), preemption should calculate the user limit to be 50% of the + // resources. Since the capacity scheduler will leave a buffer of 1 + // container, 51GB should be the amount of resources calculated for + // preemption. + Resource ulForallUsers = a.getResourceLimitForAllUsers(user_2, + clusterResource, "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + assertEquals(51*GB, ulForallUsers.getMemorySize()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org