Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 566632009F3 for ; Fri, 6 May 2016 04:40:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 55041160A04; Fri, 6 May 2016 02:40:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 54604160A05 for ; Fri, 6 May 2016 04:40:17 +0200 (CEST) Received: (qmail 10076 invoked by uid 500); 6 May 2016 02:40:16 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 10067 invoked by uid 99); 6 May 2016 02:40:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 May 2016 02:40:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5CC66DFC7E; Fri, 6 May 2016 02:40:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vinodkv@apache.org To: common-commits@hadoop.apache.org Message-Id: <492e4b63d56b43c49d1fffc965a21805@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: MAPREDUCE-6514. Fixed MapReduce ApplicationMaster to properly updated resources ask after ramping down of all reducers avoiding job hangs. Contributed by Varun Saxena and Wangda Tan. Updated the 2.7 branch patch to fix a minor issue in Date: Fri, 6 May 2016 02:40:16 +0000 (UTC) archived-at: Fri, 06 May 2016 02:40:18 -0000 Repository: hadoop Updated Branches: refs/heads/branch-2.7 2340e2908 -> 9f51a3f43 MAPREDUCE-6514. Fixed MapReduce ApplicationMaster to properly updated resources ask after ramping down of all reducers avoiding job hangs. Contributed by Varun Saxena and Wangda Tan. Updated the 2.7 branch patch to fix a minor issue in TestRMContainerAllocator.java (cherry picked from commit 8d48266720dcf0e71cfd87fef18b60a53aa1bef9) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9f51a3f4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9f51a3f4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9f51a3f4 Branch: refs/heads/branch-2.7 Commit: 9f51a3f430e4a8efc54193af4fe9fc1509f9980f Parents: 2340e29 Author: Vinod Kumar Vavilapalli Authored: Thu May 5 19:01:52 2016 -0700 Committer: Vinod Kumar Vavilapalli Committed: Thu May 5 19:39:25 2016 -0700 ---------------------------------------------------------------------- .../v2/app/rm/RMContainerAllocator.java | 26 ++-- .../v2/app/rm/RMContainerRequestor.java | 6 + .../v2/app/rm/TestRMContainerAllocator.java | 124 +++++++++++++++++++ 3 files changed, 146 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f51a3f4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 28c869a..c5b1a01 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -507,12 +507,7 @@ public class RMContainerAllocator extends RMContainerRequestor } private void clearAllPendingReduceRequests() { - LOG.info("Ramping down all scheduled reduces:" - + scheduledRequests.reduces.size()); - for (ContainerRequest req : scheduledRequests.reduces.values()) { - pendingReduces.add(req); - } - scheduledRequests.reduces.clear(); + rampDownReduces(Integer.MAX_VALUE); } private void preemptReducer(int hangingMapRequests) { @@ -684,9 +679,13 @@ public class RMContainerAllocator extends RMContainerRequestor @Private public void rampDownReduces(int rampDown) { //remove from the scheduled and move back to pending - for (int i = 0; i < rampDown; i++) { + while (rampDown > 0) { ContainerRequest request = scheduledRequests.removeReduce(); + if (request == null) { + return; + } pendingReduces.add(request); + rampDown--; } } @@ -905,6 +904,11 @@ public class RMContainerAllocator extends RMContainerRequestor Resources.add(assignedMapResource, assignedReduceResource)); } + @VisibleForTesting + public int getNumOfPendingReduces() { + return pendingReduces.size(); + } + @Private @VisibleForTesting class ScheduledRequests { @@ -920,8 +924,9 @@ public class RMContainerAllocator extends RMContainerRequestor @VisibleForTesting final Map maps = new LinkedHashMap(); - - private final LinkedHashMap reduces = + + @VisibleForTesting + final LinkedHashMap reduces = new LinkedHashMap(); boolean remove(TaskAttemptId tId) { @@ -1316,7 +1321,8 @@ public class RMContainerAllocator extends RMContainerRequestor class AssignedRequests { private final Map containerToAttemptMap = new HashMap(); - private final LinkedHashMap maps = + @VisibleForTesting + final LinkedHashMap maps = new LinkedHashMap(); @VisibleForTesting final LinkedHashMap reduces = http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f51a3f4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index f8f3b87..b466668 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -554,4 +554,10 @@ public abstract class RMContainerRequestor extends RMCommunicator { public Set getBlacklistedNodes() { return blacklistedNodes; } + + @Private + @VisibleForTesting + Set getAsk() { + return ask; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f51a3f4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index a9a247a..c2c3510 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -1695,6 +1695,8 @@ public class TestRMContainerAllocator { when(context.getApplicationID()).thenReturn(appId); when(context.getApplicationAttemptId()).thenReturn(appAttemptId); when(context.getJob(isA(JobId.class))).thenReturn(job); + when(context.getClock()).thenReturn( + new ControlledClock(new SystemClock())); when(context.getClusterInfo()).thenReturn( new ClusterInfo(Resource.newInstance(10240, 1))); when(context.getEventHandler()).thenReturn(new EventHandler() { @@ -2685,6 +2687,128 @@ public class TestRMContainerAllocator { allocator.schedule(); } + @Test + public void testUpdateAskOnRampDownAllReduces() throws Exception { + LOG.info("Running testUpdateAskOnRampDownAllReduces"); + Configuration conf = new Configuration(); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = + (DrainDispatcher) rm.getRMContext().getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 1260); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + // Use a controlled clock to advance time for test. + ControlledClock clock = (ControlledClock)allocator.getContext().getClock(); + clock.setTime(System.currentTimeMillis()); + + // Register nodes to RM. + MockNM nodeManager = rm.registerNode("h1:1234", 1024); + dispatcher.await(); + + // Request 2 maps and 1 reducer(sone on nodes which are not registered). + ContainerRequestEvent event1 = + createReq(jobId, 1, 1024, new String[] { "h1" }); + allocator.sendRequest(event1); + ContainerRequestEvent event2 = + createReq(jobId, 2, 1024, new String[] { "h2" }); + allocator.sendRequest(event2); + ContainerRequestEvent event3 = + createReq(jobId, 3, 1024, new String[] { "h2" }, false, true); + allocator.sendRequest(event3); + + // This will tell the scheduler about the requests but there will be no + // allocations as nodes are not added. + allocator.schedule(); + dispatcher.await(); + + // Advance clock so that maps can be considered as hanging. + clock.setTime(System.currentTimeMillis() + 500000L); + + // Request for another reducer on h3 which has not registered. + ContainerRequestEvent event4 = + createReq(jobId, 4, 1024, new String[] { "h3" }, false, true); + allocator.sendRequest(event4); + + allocator.schedule(); + dispatcher.await(); + + // Update resources in scheduler through node heartbeat from h1. + nodeManager.nodeHeartbeat(true); + dispatcher.await(); + + rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1)); + allocator.schedule(); + dispatcher.await(); + + // One map is assigned. + Assert.assertEquals(1, allocator.getAssignedRequests().maps.size()); + // Send deallocate request for map so that no maps are assigned after this. + ContainerAllocatorEvent deallocate = createDeallocateEvent(jobId, 1, false); + allocator.sendDeallocate(deallocate); + // Now one reducer should be scheduled and one should be pending. + Assert.assertEquals(1, allocator.getScheduledRequests().reduces.size()); + Assert.assertEquals(1, allocator.getNumOfPendingReduces()); + // No map should be assigned and one should be scheduled. + Assert.assertEquals(1, allocator.getScheduledRequests().maps.size()); + Assert.assertEquals(0, allocator.getAssignedRequests().maps.size()); + + Assert.assertEquals(6, allocator.getAsk().size()); + for (ResourceRequest req : allocator.getAsk()) { + boolean isReduce = + req.getPriority().equals(RMContainerAllocator.PRIORITY_REDUCE); + if (isReduce) { + // 1 reducer each asked on h2, * and default-rack + Assert.assertTrue((req.getResourceName().equals("*") || + req.getResourceName().equals("/default-rack") || + req.getResourceName().equals("h2")) && req.getNumContainers() == 1); + } else { //map + // 0 mappers asked on h1 and 1 each on * and default-rack + Assert.assertTrue(((req.getResourceName().equals("*") || + req.getResourceName().equals("/default-rack")) && + req.getNumContainers() == 1) || (req.getResourceName().equals("h1") + && req.getNumContainers() == 0)); + } + } + // On next allocate request to scheduler, headroom reported will be 0. + rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(0, 0)); + allocator.schedule(); + dispatcher.await(); + // After allocate response from scheduler, all scheduled reduces are ramped + // down and move to pending. 3 asks are also updated with 0 containers to + // indicate ramping down of reduces to scheduler. + Assert.assertEquals(0, allocator.getScheduledRequests().reduces.size()); + Assert.assertEquals(2, allocator.getNumOfPendingReduces()); + Assert.assertEquals(3, allocator.getAsk().size()); + for (ResourceRequest req : allocator.getAsk()) { + Assert.assertEquals( + RMContainerAllocator.PRIORITY_REDUCE, req.getPriority()); + Assert.assertTrue(req.getResourceName().equals("*") || + req.getResourceName().equals("/default-rack") || + req.getResourceName().equals("h2")); + Assert.assertEquals(Resource.newInstance(1024, 1), req.getCapability()); + Assert.assertEquals(0, req.getNumContainers()); + } + } + private static class MockScheduler implements ApplicationMasterProtocol { ApplicationAttemptId attemptId; long nextContainerId = 10; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org