From common-commits-return-78269-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Tue Jan 30 19:09:52 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 82E7A18066D for ; Tue, 30 Jan 2018 19:09:52 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 72BE8160C55; Tue, 30 Jan 2018 18:09:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 24B69160C53 for ; Tue, 30 Jan 2018 19:09:50 +0100 (CET) Received: (qmail 47835 invoked by uid 500); 30 Jan 2018 18:09:45 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 47399 invoked by uid 99); 30 Jan 2018 18:09:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Jan 2018 18:09:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CDB35F4DC1; Tue, 30 Jan 2018 18:09:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: asuresh@apache.org To: common-commits@hadoop.apache.org Date: Tue, 30 Jan 2018 18:09:54 -0000 Message-Id: <49b7db3b22894717bab72efc85575678@git.apache.org> In-Reply-To: <72697fcdd04b4ac18856e7c4cefe773b@git.apache.org> References: <72697fcdd04b4ac18856e7c4cefe773b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/50] [abbrv] hadoop git commit: YARN-7790. Improve Capacity Scheduler Async Scheduling to better handle node failures. Contributed by Wangda Tan. YARN-7790. Improve Capacity Scheduler Async Scheduling to better handle node failures. Contributed by Wangda Tan. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e9c72d04 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e9c72d04 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e9c72d04 Branch: refs/heads/YARN-6592 Commit: e9c72d04beddfe0252d2e81123a9fe66bdf04078 Parents: 3400d0c Author: Sunil G Authored: Mon Jan 29 20:43:08 2018 +0530 Committer: Sunil G Committed: Mon Jan 29 20:44:38 2018 +0530 ---------------------------------------------------------------------- .../scheduler/AbstractYarnScheduler.java | 51 +++--- .../scheduler/SchedulerNode.java | 16 ++ .../scheduler/capacity/CapacityScheduler.java | 49 +++++- .../TestRMHAForAsyncScheduler.java | 38 ++++- .../TestCapacitySchedulerAsyncScheduling.java | 159 ++++++++++++++++++- 5 files changed, 276 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c72d04/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index c94c379..4b76327 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -980,11 +980,11 @@ public abstract class AbstractYarnScheduler /** * Get lists of new containers from NodeManager and process them. * @param nm The RMNode corresponding to the NodeManager + * @param schedulerNode schedulerNode * @return list of completed containers */ - protected List updateNewContainerInfo(RMNode nm) { - SchedulerNode node = getNode(nm.getNodeID()); - + private List updateNewContainerInfo(RMNode nm, + SchedulerNode schedulerNode) { List containerInfoList = nm.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList<>(); @@ -999,14 +999,15 @@ public abstract class AbstractYarnScheduler // Processing the newly launched containers for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); + containerLaunchedOnNode(launchedContainer.getContainerId(), + schedulerNode); } // Processing the newly increased containers List newlyIncreasedContainers = nm.pullNewlyIncreasedContainers(); for (Container container : newlyIncreasedContainers) { - containerIncreasedOnNode(container.getId(), node, container); + containerIncreasedOnNode(container.getId(), schedulerNode, container); } return completedContainers; @@ -1017,12 +1018,12 @@ public abstract class AbstractYarnScheduler * @param completedContainers Extracted list of completed containers * @param releasedResources Reference resource object for completed containers * @param nodeId NodeId corresponding to the NodeManager + * @param schedulerNode schedulerNode * @return The total number of released containers */ - protected int updateCompletedContainers(List - completedContainers, Resource releasedResources, NodeId nodeId) { + private int updateCompletedContainers(List completedContainers, + Resource releasedResources, NodeId nodeId, SchedulerNode schedulerNode) { int releasedContainers = 0; - SchedulerNode node = getNode(nodeId); List untrackedContainerIdList = new ArrayList(); for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); @@ -1030,8 +1031,8 @@ public abstract class AbstractYarnScheduler RMContainer container = getRMContainer(containerId); completedContainer(container, completedContainer, RMContainerEventType.FINISHED); - if (node != null) { - node.releaseContainer(containerId, true); + if (schedulerNode != null) { + schedulerNode.releaseContainer(containerId, true); } if (container != null) { @@ -1076,14 +1077,14 @@ public abstract class AbstractYarnScheduler /** * Update container and utilization information on the NodeManager. * @param nm The NodeManager to update + * @param schedulerNode schedulerNode */ - protected void updateNodeResourceUtilization(RMNode nm) { - SchedulerNode node = getNode(nm.getNodeID()); + protected void updateNodeResourceUtilization(RMNode nm, + SchedulerNode schedulerNode) { // Updating node resource utilization - node.setAggregatedContainersUtilization( + schedulerNode.setAggregatedContainersUtilization( nm.getAggregatedContainersUtilization()); - node.setNodeUtilization(nm.getNodeUtilization()); - + schedulerNode.setNodeUtilization(nm.getNodeUtilization()); } /** @@ -1097,12 +1098,17 @@ public abstract class AbstractYarnScheduler } // Process new container information - List completedContainers = updateNewContainerInfo(nm); + SchedulerNode schedulerNode = getNode(nm.getNodeID()); + List completedContainers = updateNewContainerInfo(nm, + schedulerNode); + + // Notify Scheduler Node updated. + schedulerNode.notifyNodeUpdate(); // Process completed containers Resource releasedResources = Resource.newInstance(0, 0); int releasedContainers = updateCompletedContainers(completedContainers, - releasedResources, nm.getNodeID()); + releasedResources, nm.getNodeID(), schedulerNode); // If the node is decommissioning, send an update to have the total // resource equal to the used resource, so no available resource to @@ -1115,18 +1121,17 @@ public abstract class AbstractYarnScheduler .getEventHandler() .handle( new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption - .newInstance(getSchedulerNode(nm.getNodeID()) - .getAllocatedResource(), 0))); + .newInstance(schedulerNode.getAllocatedResource(), 0))); } updateSchedulerHealthInformation(releasedResources, releasedContainers); - updateNodeResourceUtilization(nm); + updateNodeResourceUtilization(nm, schedulerNode); // Now node data structures are up-to-date and ready for scheduling. if(LOG.isDebugEnabled()) { - SchedulerNode node = getNode(nm.getNodeID()); - LOG.debug("Node being looked for scheduling " + nm + - " availableResource: " + node.getUnallocatedResource()); + LOG.debug( + "Node being looked for scheduling " + nm + " availableResource: " + + schedulerNode.getUnallocatedResource()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c72d04/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 05dbf1e..89f748d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; @@ -76,6 +77,9 @@ public abstract class SchedulerNode { private volatile Set labels = null; + // Last updated time + private volatile long lastHeartbeatMonotonicTime; + public SchedulerNode(RMNode node, boolean usePortForNodeName, Set labels) { this.rmNode = node; @@ -87,6 +91,7 @@ public abstract class SchedulerNode { nodeName = rmNode.getHostName(); } this.labels = ImmutableSet.copyOf(labels); + this.lastHeartbeatMonotonicTime = Time.monotonicNow(); } public SchedulerNode(RMNode node, boolean usePortForNodeName) { @@ -453,6 +458,17 @@ public abstract class SchedulerNode { return this.nodeUtilization; } + public long getLastHeartbeatMonotonicTime() { + return lastHeartbeatMonotonicTime; + } + + /** + * This will be called for each node heartbeat. + */ + public void notifyNodeUpdate() { + this.lastHeartbeatMonotonicTime = Time.monotonicNow(); + } + private static class ContainerInfo { private final RMContainer container; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c72d04/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 99f4456..03ca507 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -142,7 +142,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleC import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; -import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; @@ -181,8 +180,6 @@ public class CapacityScheduler extends private CSConfigurationProvider csConfProvider; - protected Clock monotonicClock; - @Override public void setConf(Configuration conf) { yarnConf = conf; @@ -243,6 +240,8 @@ public class CapacityScheduler extends private RMNodeLabelsManager labelManager; private AppPriorityACLsManager appPriorityACLManager; + private static boolean printedVerboseLoggingForAsyncScheduling = false; + /** * EXPERT */ @@ -471,6 +470,22 @@ public class CapacityScheduler extends private final static Random random = new Random(System.currentTimeMillis()); + private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node, + CapacityScheduler cs, boolean printVerboseLog) { + // Skip node which missed 2 heartbeats since the node might be dead and + // we should not continue allocate containers on that. + long timeElapsedFromLastHeartbeat = + Time.monotonicNow() - node.getLastHeartbeatMonotonicTime(); + if (timeElapsedFromLastHeartbeat > cs.nmHeartbeatInterval * 2) { + if (printVerboseLog && LOG.isDebugEnabled()) { + LOG.debug("Skip scheduling on node because it haven't heartbeated for " + + timeElapsedFromLastHeartbeat / 1000.0f + " secs"); + } + return true; + } + return false; + } + /** * Schedule on all nodes by starting at a random point. * @param cs @@ -481,16 +496,42 @@ public class CapacityScheduler extends Collection nodes = cs.nodeTracker.getAllNodes(); int start = random.nextInt(nodes.size()); + // To avoid too verbose DEBUG logging, only print debug log once for + // every 10 secs. + boolean printSkipedNodeLogging = false; + if (Time.monotonicNow() / 1000 % 10 == 0) { + printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling); + } else { + printedVerboseLoggingForAsyncScheduling = false; + } + + // Allocate containers of node [start, end) for (FiCaSchedulerNode node : nodes) { if (current++ >= start) { + if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) { + continue; + } cs.allocateContainersToNode(node.getNodeID(), false); } } - // Now, just get everyone to be safe + + current = 0; + + // Allocate containers of node [0, start) for (FiCaSchedulerNode node : nodes) { + if (current++ > start) { + break; + } + if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) { + continue; + } cs.allocateContainersToNode(node.getNodeID(), false); } + if (printSkipedNodeLogging) { + printedVerboseLoggingForAsyncScheduling = true; + } + Thread.sleep(cs.getAsyncScheduleInterval()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c72d04/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java index 46d5cda..36f1762 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java @@ -28,13 +28,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAsyncScheduling; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.Arrays; +import java.util.List; + public class TestRMHAForAsyncScheduler extends RMHATestBase { + private TestCapacitySchedulerAsyncScheduling.NMHeartbeatThread + nmHeartbeatThread = null; @Before @Override @@ -57,26 +63,49 @@ public class TestRMHAForAsyncScheduler extends RMHATestBase { CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true); } + private void keepNMHeartbeat(List mockNMs, int interval) { + if (nmHeartbeatThread != null) { + nmHeartbeatThread.setShouldStop(); + nmHeartbeatThread = null; + } + nmHeartbeatThread = + new TestCapacitySchedulerAsyncScheduling.NMHeartbeatThread(mockNMs, + interval); + nmHeartbeatThread.start(); + } + + private void pauseNMHeartbeat() { + if (nmHeartbeatThread != null) { + nmHeartbeatThread.setShouldStop(); + nmHeartbeatThread = null; + } + } + @Test(timeout = 60000) public void testAsyncScheduleThreadStateAfterRMHATransit() throws Exception { // start two RMs, and transit rm1 to active, rm2 to standby startRMs(); // register NM - rm1.registerNode("h1:1234", 8192, 8); + MockNM nm = rm1.registerNode("192.1.1.1:1234", 8192, 8); // submit app1 and check RMApp app1 = submitAppAndCheckLaunched(rm1); + keepNMHeartbeat(Arrays.asList(nm), 1000); // failover RM1 to RM2 explicitFailover(); checkAsyncSchedulerThreads(Thread.currentThread()); + pauseNMHeartbeat(); // register NM, kill app1 - rm2.registerNode("h1:1234", 8192, 8); + nm = rm2.registerNode("192.1.1.1:1234", 8192, 8); + keepNMHeartbeat(Arrays.asList(nm), 1000); + rm2.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.LAUNCHED); rm2.killApp(app1.getApplicationId()); // submit app3 and check RMApp app2 = submitAppAndCheckLaunched(rm2); + pauseNMHeartbeat(); // failover RM2 to RM1 HAServiceProtocol.StateChangeRequestInfo requestInfo = @@ -92,12 +121,15 @@ public class TestRMHAForAsyncScheduler extends RMHATestBase { checkAsyncSchedulerThreads(Thread.currentThread()); // register NM, kill app2 - rm1.registerNode("h1:1234", 8192, 8); + nm = rm1.registerNode("192.1.1.1:1234", 8192, 8); + keepNMHeartbeat(Arrays.asList(nm), 1000); + rm1.waitForState(app2.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.LAUNCHED); rm1.killApp(app2.getApplicationId()); // submit app3 and check submitAppAndCheckLaunched(rm1); + pauseNMHeartbeat(); rm1.stop(); rm2.stop(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c72d04/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 77596e2..548b909 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; @@ -72,6 +73,8 @@ public class TestCapacitySchedulerAsyncScheduling { RMNodeLabelsManager mgr; + private NMHeartbeatThread nmHeartbeatThread = null; + @Before public void setUp() throws Exception { conf = new YarnConfiguration(); @@ -122,9 +125,11 @@ public class TestCapacitySchedulerAsyncScheduling { List nms = new ArrayList<>(); // Add 10 nodes to the cluster, in the cluster we have 200 GB resource for (int i = 0; i < 10; i++) { - nms.add(rm.registerNode("h-" + i + ":1234", 20 * GB)); + nms.add(rm.registerNode("127.0.0." + i + ":1234", 20 * GB)); } + keepNMHeartbeat(nms, 1000); + List ams = new ArrayList<>(); // Add 3 applications to the cluster, one app in one queue // the i-th app ask (20 * i) containers. So in total we will have @@ -185,8 +190,8 @@ public class TestCapacitySchedulerAsyncScheduling { // init RM & NMs & Nodes final MockRM rm = new MockRM(disableAsyncConf); rm.start(); - final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB); - final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB); + final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 9 * GB); + final MockNM nm2 = rm.registerNode("192.168.0.2:2234", 9 * GB); List nmLst = new ArrayList<>(); nmLst.add(nm1); nmLst.add(nm2); @@ -277,8 +282,8 @@ public class TestCapacitySchedulerAsyncScheduling { // init RM & NMs & Nodes final MockRM rm = new MockRM(disableAsyncConf); rm.start(); - final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB); - final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB); + final MockNM nm1 = rm.registerNode("127.0.0.1:1234", 9 * GB); + final MockNM nm2 = rm.registerNode("127.0.0.2:2234", 9 * GB); // init scheduler nodes int waitTime = 1000; @@ -416,8 +421,8 @@ public class TestCapacitySchedulerAsyncScheduling { // init RM & NMs & Nodes final MockRM rm = new MockRM(disableAsyncConf); rm.start(); - final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB); - final MockNM nm2 = rm.registerNode("h2:1234", 9 * GB); + final MockNM nm1 = rm.registerNode("127.0.0.1:1234", 9 * GB); + final MockNM nm2 = rm.registerNode("127.0.0.2:1234", 9 * GB); List nmLst = new ArrayList<>(); nmLst.add(nm1); nmLst.add(nm2); @@ -476,6 +481,146 @@ public class TestCapacitySchedulerAsyncScheduling { rm.stop(); } + /** + * Make sure scheduler skips NMs which haven't heartbeat for a while. + * @throws Exception + */ + @Test + public void testAsyncSchedulerSkipNoHeartbeatNMs() throws Exception { + int heartbeatInterval = 100; + conf.setInt( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, + 1); + conf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + + ".scheduling-interval-ms", 100); + // Heartbeat interval is 100 ms. + conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, heartbeatInterval); + + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + + // inject node label manager + MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + + List nms = new ArrayList<>(); + // Add 10 nodes to the cluster, in the cluster we have 200 GB resource + for (int i = 0; i < 10; i++) { + nms.add(rm.registerNode("127.0.0." + i + ":1234", 20 * GB)); + } + + List ams = new ArrayList<>(); + + keepNMHeartbeat(nms, heartbeatInterval); + + for (int i = 0; i < 3; i++) { + RMApp rmApp = rm.submitApp(1024, "app", "user", null, false, + Character.toString((char) (i % 34 + 97)), 1, null, null, false); + MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm); + am.registerAppAttempt(); + ams.add(am); + } + + pauseNMHeartbeat(); + + Thread.sleep(heartbeatInterval * 3); + + // Applications request containers. + for (int i = 0; i < 3; i++) { + ams.get(i).allocate("*", 1024, 20 * (i + 1), new ArrayList<>()); + } + + for (int i = 0; i < 5; i++) { + // Do heartbeat for NM 0-4 + nms.get(i).nodeHeartbeat(true); + } + + // Wait for 2000 ms. + Thread.sleep(2000); + + // Make sure that NM5-9 don't have non-AM containers. + for (int i = 0; i < 9; i++) { + if (i < 5) { + Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) > 0); + } else { + Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) == 0); + } + } + + rm.close(); + } + + public static class NMHeartbeatThread extends Thread { + private List mockNMS; + private int interval; + private volatile boolean shouldStop = false; + + public NMHeartbeatThread(List mockNMs, int interval) { + this.mockNMS = mockNMs; + this.interval = interval; + } + + public void run() { + while (true) { + if (shouldStop) { + break; + } + for (MockNM nm : mockNMS) { + try { + nm.nodeHeartbeat(true); + } catch (Exception e) { + e.printStackTrace(); + } + } + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public void setShouldStop() { + shouldStop = true; + } + } + + private void keepNMHeartbeat(List mockNMs, int interval) { + if (nmHeartbeatThread != null) { + nmHeartbeatThread.setShouldStop(); + nmHeartbeatThread = null; + } + nmHeartbeatThread = new NMHeartbeatThread(mockNMs, interval); + nmHeartbeatThread.start(); + } + + private void pauseNMHeartbeat() { + if (nmHeartbeatThread != null) { + nmHeartbeatThread.setShouldStop(); + nmHeartbeatThread = null; + } + } + + private int checkNumNonAMContainersOnNode(CapacityScheduler cs, MockNM nm) { + SchedulerNode node = cs.getNode(nm.getNodeId()); + int nonAMContainer = 0; + for (RMContainer c : node.getCopiedListOfRunningContainers()) { + if (!c.isAMContainer()) { + nonAMContainer++; + } + } + return nonAMContainer; + } + private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm, int nContainer, Resource resource, int priority, int startContainerId) throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org