From common-commits-return-90492-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Wed Nov 7 03:18:57 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id D5D2D180658 for ; Wed, 7 Nov 2018 03:18:56 +0100 (CET) Received: (qmail 72229 invoked by uid 500); 7 Nov 2018 02:18:55 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 72220 invoked by uid 99); 7 Nov 2018 02:18:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Nov 2018 02:18:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BC1A4E120E; Wed, 7 Nov 2018 02:18:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aajisaka@apache.org To: common-commits@hadoop.apache.org Message-Id: <649806440d6f44ad8bbc9231fe0edbd2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-8233. NPE in CapacityScheduler#tryCommit when handling allocate/reserve proposal whose allocatedOrReservedContainer is null. Contributed by Tao Yang. Date: Wed, 7 Nov 2018 02:18:55 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/branch-3.2 ff5a2cb5c -> 41c4e9583 YARN-8233. NPE in CapacityScheduler#tryCommit when handling allocate/reserve proposal whose allocatedOrReservedContainer is null. Contributed by Tao Yang. (cherry picked from commit 951c98f89059d64fda8456366f680eff4a7a6785) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/41c4e958 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/41c4e958 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/41c4e958 Branch: refs/heads/branch-3.2 Commit: 41c4e9583ddb0e832d277a4298ad8474e267e9ff Parents: ff5a2cb Author: Akira Ajisaka Authored: Wed Nov 7 11:17:35 2018 +0900 Committer: Akira Ajisaka Committed: Wed Nov 7 11:18:48 2018 +0900 ---------------------------------------------------------------------- .../scheduler/capacity/CapacityScheduler.java | 86 +++++++++++++------- .../TestCapacitySchedulerAsyncScheduling.java | 83 +++++++++++++++++++ 2 files changed, 141 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/41c4e958/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 4befae7..e89fce0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2768,7 +2768,11 @@ public class CapacityScheduler extends .getContainersToKill().isEmpty()) { list = new ArrayList<>(); for (RMContainer rmContainer : csAssignment.getContainersToKill()) { - list.add(getSchedulerContainer(rmContainer, false)); + SchedulerContainer schedulerContainer = + getSchedulerContainer(rmContainer, false); + if (schedulerContainer != null) { + list.add(schedulerContainer); + } } } @@ -2776,10 +2780,16 @@ public class CapacityScheduler extends if (null == list) { list = new ArrayList<>(); } - list.add( - getSchedulerContainer(csAssignment.getExcessReservation(), false)); + SchedulerContainer schedulerContainer = + getSchedulerContainer(csAssignment.getExcessReservation(), false); + if (schedulerContainer != null) { + list.add(schedulerContainer); + } } + if (list != null && list.isEmpty()) { + list = null; + } return list; } @@ -2864,11 +2874,15 @@ public class CapacityScheduler extends ((RMContainerImpl)rmContainer).setAllocationTags( new HashSet<>(schedulingRequest.getAllocationTags())); - allocated = new ContainerAllocationProposal<>( - getSchedulerContainer(rmContainer, true), - null, null, NodeType.NODE_LOCAL, NodeType.NODE_LOCAL, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, - resource); + SchedulerContainer + schedulerContainer = getSchedulerContainer(rmContainer, true); + if (schedulerContainer == null) { + allocated = null; + } else { + allocated = new ContainerAllocationProposal<>(schedulerContainer, + null, null, NodeType.NODE_LOCAL, NodeType.NODE_LOCAL, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, resource); + } } if (null != allocated) { @@ -2898,16 +2912,27 @@ public class CapacityScheduler extends csAssignment.getAssignmentInformation().getAllocationDetails(); if (!allocations.isEmpty()) { RMContainer rmContainer = allocations.get(0).rmContainer; - allocated = new ContainerAllocationProposal<>( - getSchedulerContainer(rmContainer, true), - getSchedulerContainersToRelease(csAssignment), - getSchedulerContainer(csAssignment.getFulfilledReservedContainer(), - false), csAssignment.getType(), - csAssignment.getRequestLocalityType(), - csAssignment.getSchedulingMode() != null ? - csAssignment.getSchedulingMode() : - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, - csAssignment.getResource()); + SchedulerContainer + schedulerContainer = getSchedulerContainer(rmContainer, true); + if (schedulerContainer == null) { + allocated = null; + // Decrease unconfirmed resource if app is alive + FiCaSchedulerApp app = getApplicationAttempt( + rmContainer.getApplicationAttemptId()); + if (app != null) { + app.decUnconfirmedRes(rmContainer.getAllocatedResource()); + } + } else { + allocated = new ContainerAllocationProposal<>(schedulerContainer, + getSchedulerContainersToRelease(csAssignment), + getSchedulerContainer( + csAssignment.getFulfilledReservedContainer(), false), + csAssignment.getType(), csAssignment.getRequestLocalityType(), + csAssignment.getSchedulingMode() != null ? + csAssignment.getSchedulingMode() : + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, + csAssignment.getResource()); + } } // Reserved something @@ -2915,16 +2940,21 @@ public class CapacityScheduler extends csAssignment.getAssignmentInformation().getReservationDetails(); if (!reservation.isEmpty()) { RMContainer rmContainer = reservation.get(0).rmContainer; - reserved = new ContainerAllocationProposal<>( - getSchedulerContainer(rmContainer, false), - getSchedulerContainersToRelease(csAssignment), - getSchedulerContainer(csAssignment.getFulfilledReservedContainer(), - false), csAssignment.getType(), - csAssignment.getRequestLocalityType(), - csAssignment.getSchedulingMode() != null ? - csAssignment.getSchedulingMode() : - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, - csAssignment.getResource()); + SchedulerContainer + schedulerContainer = getSchedulerContainer(rmContainer, false); + if (schedulerContainer == null) { + reserved = null; + } else { + reserved = new ContainerAllocationProposal<>(schedulerContainer, + getSchedulerContainersToRelease(csAssignment), + getSchedulerContainer( + csAssignment.getFulfilledReservedContainer(), false), + csAssignment.getType(), csAssignment.getRequestLocalityType(), + csAssignment.getSchedulingMode() != null ? + csAssignment.getSchedulingMode() : + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, + csAssignment.getResource()); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/41c4e958/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 840d30d..67c504d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -56,8 +56,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerA import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; @@ -843,6 +846,86 @@ public class TestCapacitySchedulerAsyncScheduling { return new ResourceCommitRequest(allocateProposals, null, null); } + @Test(timeout = 30000) + public void testReturnNullWhenGetSchedulerContainer() throws Exception { + // disable async-scheduling for simulating complex scenario + Configuration disableAsyncConf = new Configuration(conf); + disableAsyncConf.setBoolean( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false); + + // init RM & NMs + final MockRM rm = new MockRM(disableAsyncConf); + rm.start(); + final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 8 * GB); + final MockNM nm2 = rm.registerNode("192.168.0.2:2234", 8 * GB); + rm.drainEvents(); + CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler(); + SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId()); + RMNode rmNode1 = cs.getNode(nm1.getNodeId()).getRMNode(); + SchedulerNode sn2 = cs.getSchedulerNode(nm2.getNodeId()); + + // launch app1-am on nm1 + RMApp app1 = rm.submitApp(1 * GB, "app1", "user", null, false, "default", + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + // app2 asks 1 * 1G container + am1.allocate(ImmutableList.of(ResourceRequest + .newInstance(Priority.newInstance(0), "*", + Resources.createResource(1 * GB), 1)), null); + RMContainer amContainer = cs.getRMContainer( + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1)); + + // spy CapacityScheduler + final CapacityScheduler spyCs = Mockito.spy(cs); + // hook CapacityScheduler#submitResourceCommitRequest + List assignmentSnapshots = new ArrayList<>(); + Mockito.doAnswer(new Answer() { + public Boolean answer(InvocationOnMock invocation) throws Exception { + CSAssignment assignment = (CSAssignment) invocation.getArguments()[1]; + if (cs.getNode(nm1.getNodeId()) != null) { + // decommission nm1 for first allocation on nm1 + cs.getRMContext().getDispatcher().getEventHandler().handle( + new RMNodeEvent(nm1.getNodeId(), RMNodeEventType.DECOMMISSION)); + rm.drainEvents(); + Assert.assertEquals(NodeState.DECOMMISSIONED, rmNode1.getState()); + Assert.assertNull(cs.getNode(nm1.getNodeId())); + assignmentSnapshots.add(assignment); + } else { + // add am container on nm1 to containersToKill + // for second allocation on nm2 + assignment.setContainersToKill(ImmutableList.of(amContainer)); + } + // check no NPE in actual submit, before YARN-8233 will throw NPE + cs.submitResourceCommitRequest((Resource) invocation.getArguments()[0], + assignment); + return false; + } + }).when(spyCs).submitResourceCommitRequest(Mockito.any(Resource.class), + Mockito.any(CSAssignment.class)); + + // allocation on nm1, test return null when get scheduler container + CandidateNodeSet candidateNodeSet = + new SimpleCandidateNodeSet(sn1); + spyCs.allocateContainersToNode(candidateNodeSet, false); + // make sure unconfirmed resource is decreased correctly + Assert.assertTrue(spyCs.getApplicationAttempt(am1.getApplicationAttemptId()) + .hasPendingResourceRequest(RMNodeLabelsManager.NO_LABEL, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); + + // allocation on nm2, + // test return null when get scheduler container to release + candidateNodeSet = + new SimpleCandidateNodeSet(sn2); + spyCs.allocateContainersToNode(candidateNodeSet, false); + // make sure unconfirmed resource is decreased correctly + Assert.assertTrue(spyCs.getApplicationAttempt(am1.getApplicationAttemptId()) + .hasPendingResourceRequest(RMNodeLabelsManager.NO_LABEL, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); + + rm.stop(); + } + private void keepNMHeartbeat(List mockNMs, int interval) { if (nmHeartbeatThread != null) { nmHeartbeatThread.setShouldStop(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org