Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2961820049D for ; Wed, 9 Aug 2017 19:36:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 27CC0169A91; Wed, 9 Aug 2017 17:36:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 205EC169A8D for ; Wed, 9 Aug 2017 19:36:38 +0200 (CEST) Received: (qmail 26298 invoked by uid 500); 9 Aug 2017 17:36:25 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 24177 invoked by uid 99); 9 Aug 2017 17:36:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Aug 2017 17:36:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BC853F556E; Wed, 9 Aug 2017 17:36:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stevel@apache.org To: common-commits@hadoop.apache.org Date: Wed, 09 Aug 2017 17:36:25 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/51] [abbrv] hadoop git commit: YARN-6678. Handle IllegalStateException in Async Scheduling mode of CapacityScheduler. Contributed by Tao Yang. archived-at: Wed, 09 Aug 2017 17:36:40 -0000 YARN-6678. Handle IllegalStateException in Async Scheduling mode of CapacityScheduler. Contributed by Tao Yang. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f64cfeaf Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f64cfeaf Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f64cfeaf Branch: refs/heads/HADOOP-13345 Commit: f64cfeaf61ec65a465decdd8215f567d4e6677a9 Parents: 79df1e7 Author: Sunil G Authored: Thu Aug 3 19:27:10 2017 +0530 Committer: Sunil G Committed: Thu Aug 3 19:27:10 2017 +0530 ---------------------------------------------------------------------- .../scheduler/common/fica/FiCaSchedulerApp.java | 13 ++ .../TestCapacitySchedulerAsyncScheduling.java | 147 +++++++++++++++++++ 2 files changed, 160 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f64cfeaf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index ad4c8ce..17bb104 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -426,6 +426,19 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { // accepted & confirmed, it will become RESERVED state if (schedulerContainer.getRmContainer().getState() == RMContainerState.RESERVED) { + // Check if node currently reserved by other application, there may + // be some outdated proposals in async-scheduling environment + if (schedulerContainer.getRmContainer() != schedulerContainer + .getSchedulerNode().getReservedContainer()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Try to re-reserve a container, but node " + + schedulerContainer.getSchedulerNode() + + " is already reserved by another container" + + schedulerContainer.getSchedulerNode() + .getReservedContainer().getContainerId()); + } + return false; + } // Set reReservation == true reReservation = true; } else { http://git-wip-us.apache.org/repos/asf/hadoop/blob/f64cfeaf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 0eb89d7..0c3130d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -20,7 +20,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -41,20 +44,26 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; public class TestCapacitySchedulerAsyncScheduling { private final int GB = 1024; @@ -257,6 +266,144 @@ public class TestCapacitySchedulerAsyncScheduling { rm.stop(); } + // Testcase for YARN-6678 + @Test(timeout = 30000) + public void testCommitOutdatedReservedProposal() throws Exception { + // disable async-scheduling for simulating complex since scene + Configuration disableAsyncConf = new Configuration(conf); + disableAsyncConf.setBoolean( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false); + + // init RM & NMs & Nodes + final MockRM rm = new MockRM(disableAsyncConf); + rm.start(); + final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB); + final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB); + + // init scheduler nodes + int waitTime = 1000; + while (waitTime > 0 && + ((AbstractYarnScheduler) rm.getRMContext().getScheduler()) + .getNodeTracker().nodeCount() < 2) { + waitTime -= 10; + Thread.sleep(10); + } + Assert.assertEquals(2, + ((AbstractYarnScheduler) rm.getRMContext().getScheduler()) + .getNodeTracker().nodeCount()); + + YarnScheduler scheduler = rm.getRMContext().getScheduler(); + final SchedulerNode sn1 = + ((CapacityScheduler) scheduler).getSchedulerNode(nm1.getNodeId()); + final SchedulerNode sn2 = + ((CapacityScheduler) scheduler).getSchedulerNode(nm2.getNodeId()); + + // submit app1, am1 is running on nm1 + RMApp app = rm.submitApp(200, "app", "user", null, "default"); + final MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + // submit app2, am2 is running on nm1 + RMApp app2 = rm.submitApp(200, "app", "user", null, "default"); + final MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); + + // allocate and launch 2 containers for app1 + allocateAndLaunchContainers(am, nm1, rm, 1, + Resources.createResource(5 * GB), 0, 2); + allocateAndLaunchContainers(am, nm2, rm, 1, + Resources.createResource(5 * GB), 0, 3); + + // nm1 runs 3 containers(app1-container_01/AM, app1-container_02, + // app2-container_01/AM) + // nm2 runs 1 container(app1-container_03) + Assert.assertEquals(3, sn1.getNumContainers()); + Assert.assertEquals(1, sn2.getNumContainers()); + + // reserve 1 container(app1-container_04) for app1 on nm1 + ResourceRequest rr2 = ResourceRequest + .newInstance(Priority.newInstance(0), "*", + Resources.createResource(5 * GB), 1); + am.allocate(Arrays.asList(rr2), null); + nm1.nodeHeartbeat(true); + // wait app1-container_04 reserved on nm1 + waitTime = 1000; + while (waitTime > 0 && sn1.getReservedContainer() == null) { + waitTime -= 10; + Thread.sleep(10); + } + Assert.assertNotNull(sn1.getReservedContainer()); + + final CapacityScheduler cs = (CapacityScheduler) scheduler; + final CapacityScheduler spyCs = Mockito.spy(cs); + final AtomicBoolean isFirstReserve = new AtomicBoolean(true); + final AtomicBoolean isChecked = new AtomicBoolean(false); + // handle CapacityScheduler#tryCommit, + // reproduce the process that can raise IllegalStateException before + Mockito.doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) throws Exception { + ResourceCommitRequest request = + (ResourceCommitRequest) invocation.getArguments()[1]; + if (request.getContainersToReserve().size() > 0 && isFirstReserve + .compareAndSet(true, false)) { + // release app1-container_03 on nm2 + RMContainer killableContainer = + sn2.getCopiedListOfRunningContainers().get(0); + cs.completedContainer(killableContainer, ContainerStatus + .newInstance(killableContainer.getContainerId(), + ContainerState.COMPLETE, "", + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), + RMContainerEventType.KILL); + Assert.assertEquals(0, sn2.getCopiedListOfRunningContainers().size()); + // unreserve app1-container_04 on nm1 + // and allocate app1-container_05 on nm2 + cs.handle(new NodeUpdateSchedulerEvent(sn2.getRMNode())); + int waitTime = 1000; + while (waitTime > 0 + && sn2.getCopiedListOfRunningContainers().size() == 0) { + waitTime -= 10; + Thread.sleep(10); + } + Assert.assertEquals(1, sn2.getCopiedListOfRunningContainers().size()); + Assert.assertNull(sn1.getReservedContainer()); + + // reserve app2-container_02 on nm1 + ResourceRequest rr3 = ResourceRequest + .newInstance(Priority.newInstance(0), "*", + Resources.createResource(5 * GB), 1); + am2.allocate(Arrays.asList(rr3), null); + cs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode())); + waitTime = 1000; + while (waitTime > 0 && sn1.getReservedContainer() == null) { + waitTime -= 10; + Thread.sleep(10); + } + Assert.assertNotNull(sn1.getReservedContainer()); + + // call real apply + try { + cs.tryCommit((Resource) invocation.getArguments()[0], + (ResourceCommitRequest) invocation.getArguments()[1]); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + isChecked.set(true); + } else { + cs.tryCommit((Resource) invocation.getArguments()[0], + (ResourceCommitRequest) invocation.getArguments()[1]); + } + return null; + } + }).when(spyCs).tryCommit(Mockito.any(Resource.class), + Mockito.any(ResourceCommitRequest.class)); + + spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode())); + + waitTime = 1000; + while (waitTime > 0 && !isChecked.get()) { + waitTime -= 10; + Thread.sleep(10); + } + rm.stop(); + } private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm, int nContainer, Resource resource, int priority, int startContainerId) --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org