Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1A15520049C for ; Fri, 11 Aug 2017 19:31:48 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 189AE16DA6D; Fri, 11 Aug 2017 17:31:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E81D816DA68 for ; Fri, 11 Aug 2017 19:31:46 +0200 (CEST) Received: (qmail 43918 invoked by uid 500); 11 Aug 2017 17:31:43 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 43634 invoked by uid 99); 11 Aug 2017 17:31:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Aug 2017 17:31:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E0CBFF565F; Fri, 11 Aug 2017 17:31:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wangda@apache.org To: common-commits@hadoop.apache.org Date: Fri, 11 Aug 2017 17:31:53 -0000 Message-Id: <7ee75b16d2f848cebebd1d558abd412f@git.apache.org> In-Reply-To: <16cfc90eeba24e439f36f743a85addb3@git.apache.org> References: <16cfc90eeba24e439f36f743a85addb3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [13/50] [abbrv] hadoop git commit: YARN-4161. Capacity Scheduler : Assign single or multiple containers per heart beat driven by configuration. (Wei Yan via wangda) archived-at: Fri, 11 Aug 2017 17:31:48 -0000 YARN-4161. Capacity Scheduler : Assign single or multiple containers per heart beat driven by configuration. (Wei Yan via wangda) Change-Id: Ic441ae4e0bf72e7232411eb54243ec143d5fd0d3 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/adb84f34 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/adb84f34 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/adb84f34 Branch: refs/heads/YARN-5881 Commit: adb84f34db7e1cdcd72aa8e3deb464c48da9e353 Parents: a3a9c97 Author: Wangda Tan Authored: Mon Aug 7 11:32:12 2017 -0700 Committer: Wangda Tan Committed: Mon Aug 7 11:32:21 2017 -0700 ---------------------------------------------------------------------- .../scheduler/capacity/CapacityScheduler.java | 53 ++++- .../CapacitySchedulerConfiguration.java | 23 ++ .../capacity/TestCapacityScheduler.java | 232 ++++++++++++++++++- 3 files changed, 289 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/adb84f34/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 2ccaf63..3286982 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -94,11 +94,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidExcep import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; @@ -163,6 +161,9 @@ public class CapacityScheduler extends private int offswitchPerHeartbeatLimit; + private boolean assignMultipleEnabled; + + private int maxAssignPerHeartbeat; @Override public void setConf(Configuration conf) { @@ -308,6 +309,9 @@ public class CapacityScheduler extends asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, DEFAULT_ASYNC_SCHEDULER_INTERVAL); + this.assignMultipleEnabled = this.conf.getAssignMultipleEnabled(); + this.maxAssignPerHeartbeat = this.conf.getMaxAssignPerHeartbeat(); + // number of threads for async scheduling int maxAsyncSchedulingThreads = this.conf.getInt( CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, @@ -1109,17 +1113,29 @@ public class CapacityScheduler extends .getAssignmentInformation().getReserved()); } - private boolean canAllocateMore(CSAssignment assignment, int offswitchCount) { - if (null != assignment && Resources.greaterThan(getResourceCalculator(), - getClusterResource(), assignment.getResource(), Resources.none()) - && offswitchCount < offswitchPerHeartbeatLimit) { - // And it should not be a reserved container - if (assignment.getAssignmentInformation().getNumReservations() == 0) { - return true; - } + private boolean canAllocateMore(CSAssignment assignment, int offswitchCount, + int assignedContainers) { + // Current assignment shouldn't be empty + if (assignment == null + || Resources.equals(assignment.getResource(), Resources.none())) { + return false; } - return false; + // offswitch assignment should be under threshold + if (offswitchCount >= offswitchPerHeartbeatLimit) { + return false; + } + + // And it should not be a reserved container + if (assignment.getAssignmentInformation().getNumReservations() > 0) { + return false; + } + + // assignMultipleEnabled should be ON, + // and assignedContainers should be under threshold + return assignMultipleEnabled + && (maxAssignPerHeartbeat == -1 + || assignedContainers < maxAssignPerHeartbeat); } /** @@ -1131,6 +1147,7 @@ public class CapacityScheduler extends FiCaSchedulerNode node = getNode(nodeId); if (null != node) { int offswitchCount = 0; + int assignedContainers = 0; PlacementSet ps = new SimplePlacementSet<>(node); CSAssignment assignment = allocateContainersToNode(ps, withNodeHeartbeat); @@ -1141,7 +1158,13 @@ public class CapacityScheduler extends offswitchCount++; } - while (canAllocateMore(assignment, offswitchCount)) { + if (Resources.greaterThan(calculator, getClusterResource(), + assignment.getResource(), Resources.none())) { + assignedContainers++; + } + + while (canAllocateMore(assignment, offswitchCount, + assignedContainers)) { // Try to see if it is possible to allocate multiple container for // the same node heartbeat assignment = allocateContainersToNode(ps, true); @@ -1150,6 +1173,12 @@ public class CapacityScheduler extends && assignment.getType() == NodeType.OFF_SWITCH) { offswitchCount++; } + + if (null != assignment + && Resources.greaterThan(calculator, getClusterResource(), + assignment.getResource(), Resources.none())) { + assignedContainers++; + } } if (offswitchCount >= offswitchPerHeartbeatLimit) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/adb84f34/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 1e29d50..13b9ff6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -301,6 +301,21 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false; + @Private + public static final String ASSIGN_MULTIPLE_ENABLED = PREFIX + + "per-node-heartbeat.multiple-assignments-enabled"; + + @Private + public static final boolean DEFAULT_ASSIGN_MULTIPLE_ENABLED = true; + + /** Maximum number of containers to assign on each check-in. */ + @Private + public static final String MAX_ASSIGN_PER_HEARTBEAT = PREFIX + + "per-node-heartbeat.maximum-container-assignments"; + + @Private + public static final int DEFAULT_MAX_ASSIGN_PER_HEARTBEAT = -1; + AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser(); public CapacitySchedulerConfiguration() { @@ -1473,4 +1488,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur } return userWeights; } + + public boolean getAssignMultipleEnabled() { + return getBoolean(ASSIGN_MULTIPLE_ENABLED, DEFAULT_ASSIGN_MULTIPLE_ENABLED); + } + + public int getMaxAssignPerHeartbeat() { + return getInt(MAX_ASSIGN_PER_HEARTBEAT, DEFAULT_MAX_ASSIGN_PER_HEARTBEAT); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/adb84f34/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index f51f771..64e0df4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -233,6 +233,17 @@ public class TestCapacityScheduler { } } + private NodeManager registerNode(ResourceManager rm, String hostName, + int containerManagerPort, int httpPort, String rackName, + Resource capability) throws IOException, YarnException { + NodeManager nm = new NodeManager(hostName, + containerManagerPort, httpPort, rackName, capability, rm); + NodeAddedSchedulerEvent nodeAddEvent1 = + new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes() + .get(nm.getNodeId())); + rm.getResourceScheduler().handle(nodeAddEvent1); + return nm; + } @Test (timeout = 30000) public void testConfValidation() throws Exception { @@ -267,12 +278,12 @@ public class TestCapacityScheduler { } } - private org.apache.hadoop.yarn.server.resourcemanager.NodeManager + private NodeManager registerNode(String hostName, int containerManagerPort, int httpPort, String rackName, Resource capability) throws IOException, YarnException { - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = - new org.apache.hadoop.yarn.server.resourcemanager.NodeManager( + NodeManager nm = + new NodeManager( hostName, containerManagerPort, httpPort, rackName, capability, resourceManager); NodeAddedSchedulerEvent nodeAddEvent1 = @@ -400,8 +411,216 @@ public class TestCapacityScheduler { LOG.info("--- END: testCapacityScheduler ---"); } - private void nodeUpdate( - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm) { + @Test + public void testNotAssignMultiple() throws Exception { + LOG.info("--- START: testNotAssignMultiple ---"); + ResourceManager rm = new ResourceManager() { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(getConfig()); + return mgr; + } + }; + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setBoolean( + CapacitySchedulerConfiguration.ASSIGN_MULTIPLE_ENABLED, false); + setupQueueConfiguration(csConf); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + rm.init(conf); + rm.getRMContext().getContainerTokenSecretManager().rollMasterKey(); + rm.getRMContext().getNMTokenSecretManager().rollMasterKey(); + ((AsyncDispatcher) rm.getRMContext().getDispatcher()).start(); + RMContext mC = mock(RMContext.class); + when(mC.getConfigurationProvider()).thenReturn( + new LocalConfigurationProvider()); + + // Register node1 + String host0 = "host_0"; + NodeManager nm0 = + registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(10 * GB, 10)); + + // ResourceRequest priorities + Priority priority0 = Priority.newInstance(0); + Priority priority1 = Priority.newInstance(1); + + // Submit an application + Application application0 = new Application("user_0", "a1", rm); + application0.submit(); + application0.addNodeManager(host0, 1234, nm0); + + Resource capability00 = Resources.createResource(1 * GB, 1); + application0.addResourceRequestSpec(priority0, capability00); + + Resource capability01 = Resources.createResource(2 * GB, 1); + application0.addResourceRequestSpec(priority1, capability01); + + Task task00 = + new Task(application0, priority0, new String[] {host0}); + Task task01 = + new Task(application0, priority1, new String[] {host0}); + application0.addTask(task00); + application0.addTask(task01); + + // Submit another application + Application application1 = new Application("user_1", "b2", rm); + application1.submit(); + application1.addNodeManager(host0, 1234, nm0); + + Resource capability10 = Resources.createResource(3 * GB, 1); + application1.addResourceRequestSpec(priority0, capability10); + + Resource capability11 = Resources.createResource(4 * GB, 1); + application1.addResourceRequestSpec(priority1, capability11); + + Task task10 = new Task(application1, priority0, new String[] {host0}); + Task task11 = new Task(application1, priority1, new String[] {host0}); + application1.addTask(task10); + application1.addTask(task11); + + // Send resource requests to the scheduler + application0.schedule(); + + application1.schedule(); + + // Send a heartbeat to kick the tires on the Scheduler + LOG.info("Kick!"); + + // task00, used=1G + nodeUpdate(rm, nm0); + + // Get allocations from the scheduler + application0.schedule(); + application1.schedule(); + // 1 Task per heart beat should be scheduled + checkNodeResourceUsage(3 * GB, nm0); // task00 (1G) + checkApplicationResourceUsage(0 * GB, application0); + checkApplicationResourceUsage(3 * GB, application1); + + // Another heartbeat + nodeUpdate(rm, nm0); + application0.schedule(); + checkApplicationResourceUsage(1 * GB, application0); + application1.schedule(); + checkApplicationResourceUsage(3 * GB, application1); + checkNodeResourceUsage(4 * GB, nm0); + LOG.info("--- START: testNotAssignMultiple ---"); + } + + @Test + public void testAssignMultiple() throws Exception { + LOG.info("--- START: testAssignMultiple ---"); + ResourceManager rm = new ResourceManager() { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(getConfig()); + return mgr; + } + }; + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setBoolean( + CapacitySchedulerConfiguration.ASSIGN_MULTIPLE_ENABLED, true); + // Each heartbeat will assign 2 containers at most + csConf.setInt(CapacitySchedulerConfiguration.MAX_ASSIGN_PER_HEARTBEAT, 2); + setupQueueConfiguration(csConf); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + rm.init(conf); + rm.getRMContext().getContainerTokenSecretManager().rollMasterKey(); + rm.getRMContext().getNMTokenSecretManager().rollMasterKey(); + ((AsyncDispatcher) rm.getRMContext().getDispatcher()).start(); + RMContext mC = mock(RMContext.class); + when(mC.getConfigurationProvider()).thenReturn( + new LocalConfigurationProvider()); + + // Register node1 + String host0 = "host_0"; + NodeManager nm0 = + registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(10 * GB, 10)); + + // ResourceRequest priorities + Priority priority0 = Priority.newInstance(0); + Priority priority1 = Priority.newInstance(1); + + // Submit an application + Application application0 = new Application("user_0", "a1", rm); + application0.submit(); + application0.addNodeManager(host0, 1234, nm0); + + Resource capability00 = Resources.createResource(1 * GB, 1); + application0.addResourceRequestSpec(priority0, capability00); + + Resource capability01 = Resources.createResource(2 * GB, 1); + application0.addResourceRequestSpec(priority1, capability01); + + Task task00 = new Task(application0, priority0, new String[] {host0}); + Task task01 = new Task(application0, priority1, new String[] {host0}); + application0.addTask(task00); + application0.addTask(task01); + + // Submit another application + Application application1 = new Application("user_1", "b2", rm); + application1.submit(); + application1.addNodeManager(host0, 1234, nm0); + + Resource capability10 = Resources.createResource(3 * GB, 1); + application1.addResourceRequestSpec(priority0, capability10); + + Resource capability11 = Resources.createResource(4 * GB, 1); + application1.addResourceRequestSpec(priority1, capability11); + + Task task10 = + new Task(application1, priority0, new String[] {host0}); + Task task11 = + new Task(application1, priority1, new String[] {host0}); + application1.addTask(task10); + application1.addTask(task11); + + // Send resource requests to the scheduler + application0.schedule(); + + application1.schedule(); + + // Send a heartbeat to kick the tires on the Scheduler + LOG.info("Kick!"); + + // task_0_0, used=1G + nodeUpdate(rm, nm0); + + // Get allocations from the scheduler + application0.schedule(); + application1.schedule(); + // 1 Task per heart beat should be scheduled + checkNodeResourceUsage(4 * GB, nm0); // task00 (1G) + checkApplicationResourceUsage(1 * GB, application0); + checkApplicationResourceUsage(3 * GB, application1); + + // Another heartbeat + nodeUpdate(rm, nm0); + application0.schedule(); + checkApplicationResourceUsage(3 * GB, application0); + application1.schedule(); + checkApplicationResourceUsage(7 * GB, application1); + checkNodeResourceUsage(10 * GB, nm0); + LOG.info("--- START: testAssignMultiple ---"); + } + + private void nodeUpdate(ResourceManager rm, NodeManager nm) { + RMNode node = rm.getRMContext().getRMNodes().get(nm.getNodeId()); + // Send a heartbeat to kick the tires on the Scheduler + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + rm.getResourceScheduler().handle(nodeUpdate); + } + + private void nodeUpdate(NodeManager nm) { RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); // Send a heartbeat to kick the tires on the Scheduler NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); @@ -699,8 +918,7 @@ public class TestCapacityScheduler { Assert.assertEquals(expected, application.getUsedResources().getMemorySize()); } - private void checkNodeResourceUsage(int expected, - org.apache.hadoop.yarn.server.resourcemanager.NodeManager node) { + private void checkNodeResourceUsage(int expected, NodeManager node) { Assert.assertEquals(expected, node.getUsed().getMemorySize()); node.checkResourceUsage(); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org