Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 99B48200BAE for ; Fri, 28 Oct 2016 19:32:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 984FE160AE4; Fri, 28 Oct 2016 17:32:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 93193160ACA for ; Fri, 28 Oct 2016 19:32:03 +0200 (CEST) Received: (qmail 16631 invoked by uid 500); 28 Oct 2016 17:32:02 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 16622 invoked by uid 99); 28 Oct 2016 17:32:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Oct 2016 17:32:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9C253E04EE; Fri, 28 Oct 2016 17:32:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jlowe@apache.org To: common-commits@hadoop.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-4963. capacity scheduler: Make number of OFF_SWITCH assignments per heartbeat configurable. Contributed by Nathan Roberts (cherry picked from commit 1eae719bcead45915977aa220324650eab3c1b9e) Date: Fri, 28 Oct 2016 17:32:02 +0000 (UTC) archived-at: Fri, 28 Oct 2016 17:32:04 -0000 Repository: hadoop Updated Branches: refs/heads/branch-2 ea9a1be10 -> b85b5c6b9 YARN-4963. capacity scheduler: Make number of OFF_SWITCH assignments per heartbeat configurable. Contributed by Nathan Roberts (cherry picked from commit 1eae719bcead45915977aa220324650eab3c1b9e) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b85b5c6b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b85b5c6b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b85b5c6b Branch: refs/heads/branch-2 Commit: b85b5c6b9474eb6b14bed7331b494706a3fb9155 Parents: ea9a1be Author: Jason Lowe Authored: Fri Oct 28 17:30:15 2016 +0000 Committer: Jason Lowe Committed: Fri Oct 28 17:31:29 2016 +0000 ---------------------------------------------------------------------- .../conf/capacity-scheduler.xml | 12 ++++++++ .../CapacitySchedulerConfiguration.java | 21 +++++++++++++ .../scheduler/capacity/ParentQueue.java | 30 +++++++++++++----- .../scheduler/capacity/TestParentQueue.java | 32 +++++++++++++------- 4 files changed, 77 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b85b5c6b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml index 30f4eb9..6ac726e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml @@ -131,4 +131,16 @@ + + yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments + 1 + + Controls the number of OFF_SWITCH assignments allowed + during a node's heartbeat. Increasing this value can improve + scheduling rate for OFF_SWITCH containers. Lower values reduce + "clumping" of applications on particular nodes. The default is 1. + Legal values are 1-MAX_INT. This config is refreshable. + + + http://git-wip-us.apache.org/repos/asf/hadoop/blob/b85b5c6b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index d5d1374..6db5074 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -191,6 +191,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur PREFIX + "rack-locality-full-reset"; @Private + public static final int DEFAULT_OFFSWITCH_PER_HEARTBEAT_LIMIT = 1; + + @Private + public static final String OFFSWITCH_PER_HEARTBEAT_LIMIT = + PREFIX + "per-node-heartbeat.maximum-offswitch-assignments"; + + @Private public static final boolean DEFAULT_RACK_LOCALITY_FULL_RESET = true; @Private @@ -713,6 +720,20 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur return getBoolean(ENABLE_USER_METRICS, DEFAULT_ENABLE_USER_METRICS); } + public int getOffSwitchPerHeartbeatLimit() { + int limit = getInt(OFFSWITCH_PER_HEARTBEAT_LIMIT, + DEFAULT_OFFSWITCH_PER_HEARTBEAT_LIMIT); + if (limit < 1) { + LOG.warn(OFFSWITCH_PER_HEARTBEAT_LIMIT + "(" + limit + ") < 1. Using 1."); + limit = 1; + } + return limit; + } + + public void setOffSwitchPerHeartbeatLimit(int limit) { + setInt(OFFSWITCH_PER_HEARTBEAT_LIMIT, limit); + } + public int getNodeLocalityDelay() { return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b85b5c6b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index ffb6892..a69af6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -76,6 +76,7 @@ public class ParentQueue extends AbstractCSQueue { volatile int numApplications; private final CapacitySchedulerContext scheduler; private boolean needToResortQueuesAtNextAllocation = false; + private int offswitchPerHeartbeatLimit; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -125,12 +126,16 @@ public class ParentQueue extends AbstractCSQueue { } } + offswitchPerHeartbeatLimit = + csContext.getConfiguration().getOffSwitchPerHeartbeatLimit(); + LOG.info(queueName + ", capacity=" + this.queueCapacities.getCapacity() + ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() + ", maxCapacity=" + this.queueCapacities.getMaximumCapacity() + ", absoluteMaxCapacity=" + this.queueCapacities .getAbsoluteMaximumCapacity() + ", state=" + state + ", acls=" + aclsString + ", labels=" + labelStrBuilder.toString() + "\n" + + ", offswitchPerHeartbeatLimit = " + getOffSwitchPerHeartbeatLimit() + ", reservationsContinueLooking=" + reservationsContinueLooking); } finally { writeLock.unlock(); @@ -210,6 +215,11 @@ public class ParentQueue extends AbstractCSQueue { } + @Private + public int getOffSwitchPerHeartbeatLimit() { + return offswitchPerHeartbeatLimit; + } + private QueueUserACLInfo getUserAclInfo( UserGroupInformation user) { try { @@ -427,6 +437,7 @@ public class ParentQueue extends AbstractCSQueue { public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits resourceLimits, SchedulingMode schedulingMode) { + int offswitchCount = 0; try { writeLock.lock(); // if our queue cannot access this node, just return @@ -582,13 +593,18 @@ public class ParentQueue extends AbstractCSQueue { + getAbsoluteUsedCapacity()); } - // Do not assign more than one container if this isn't the root queue - // or if we've already assigned an off-switch container - if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) { + if (assignment.getType() == NodeType.OFF_SWITCH) { + offswitchCount++; + } + + // Do not assign more containers if this isn't the root queue + // or if we've already assigned enough OFF_SWITCH containers in + // this pass + if (!rootQueue || offswitchCount >= getOffSwitchPerHeartbeatLimit()) { if (LOG.isDebugEnabled()) { - if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) { - LOG.debug("Not assigning more than one off-switch container," - + " assignments so far: " + assignment); + if (rootQueue && offswitchCount >= getOffSwitchPerHeartbeatLimit()) { + LOG.debug("Assigned maximum number of off-switch containers: " + + offswitchCount + ", assignments so far: " + assignment); } } break; @@ -1046,4 +1062,4 @@ public class ParentQueue extends AbstractCSQueue { } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b85b5c6b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 890e998..42a8872 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; import java.util.HashMap; @@ -613,6 +614,8 @@ public class TestParentQueue { public void testOffSwitchScheduling() throws Exception { // Setup queue configs setupSingleLevelQueues(csConf); + csConf.setOffSwitchPerHeartbeatLimit(2); + Map queues = new HashMap(); CSQueue root = @@ -643,12 +646,18 @@ public class TestParentQueue { queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage() .incPending(Resources.createResource(1 * GB)); - // Simulate B returning a container on node_0 - stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH); + // Simulate returning 2 containers on node_0 before offswitch limit + stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); + root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verifyQueueMetrics(a, 0*GB, clusterResource); + InOrder allocationOrder = inOrder(a, b); + allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource), + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource), + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 1*GB, clusterResource); // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G @@ -657,27 +666,28 @@ public class TestParentQueue { stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH); root.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - InOrder allocationOrder = inOrder(a, b); - allocationOrder.verify(a).assignContainers(eq(clusterResource), + allocationOrder = inOrder(a, b); + allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource), any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(b).assignContainers(eq(clusterResource), + allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource), any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); - verifyQueueMetrics(a, 2*GB, clusterResource); + verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); // Now, B should get the scheduling opportunity // since A has 2/6G while B has 2/14G, - // However, since B returns off-switch, A won't get an opportunity + // A also gets an opportunity because offswitchlimit not reached stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL); stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH); + root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b, a); - allocationOrder.verify(b).assignContainers(eq(clusterResource), + allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource), any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(a).assignContainers(eq(clusterResource), + allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource), any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); - verifyQueueMetrics(a, 2*GB, clusterResource); + verifyQueueMetrics(a, 4*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org