Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9622D200BB5 for ; Sun, 6 Nov 2016 10:57:57 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 94D37160AFC; Sun, 6 Nov 2016 09:57:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 94BFD160B10 for ; Sun, 6 Nov 2016 10:57:56 +0100 (CET) Received: (qmail 26504 invoked by uid 500); 6 Nov 2016 09:57:48 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 26186 invoked by uid 99); 6 Nov 2016 09:57:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 06 Nov 2016 09:57:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 13E5BF1730; Sun, 6 Nov 2016 09:57:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: varunsaxena@apache.org To: common-commits@hadoop.apache.org Date: Sun, 06 Nov 2016 09:58:01 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [15/50] [abbrv] hadoop git commit: MAPREDUCE-6765. MR should not schedule container requests in cases where reducer or mapper containers demand resource larger than the maximum supported (haibochen via rkanter) archived-at: Sun, 06 Nov 2016 09:57:57 -0000 MAPREDUCE-6765. MR should not schedule container requests in cases where reducer or mapper containers demand resource larger than the maximum supported (haibochen via rkanter) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fc2b69eb Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fc2b69eb Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fc2b69eb Branch: refs/heads/YARN-5355 Commit: fc2b69eba1c5df59f6175205c27dc7b584df50c0 Parents: 1b6ecaf Author: Robert Kanter Authored: Tue Nov 1 20:47:25 2016 -0700 Committer: Robert Kanter Committed: Tue Nov 1 20:47:25 2016 -0700 ---------------------------------------------------------------------- .../v2/app/rm/RMContainerAllocator.java | 171 +++++++++++-------- .../v2/app/rm/TestRMContainerAllocator.java | 96 ++++++++++- 2 files changed, 195 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc2b69eb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index e3b673a..db8f337 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -151,10 +151,10 @@ public class RMContainerAllocator extends RMContainerRequestor //holds information about the assigned containers to task attempts private final AssignedRequests assignedRequests; - + //holds scheduled requests to be fulfilled by RM private final ScheduledRequests scheduledRequests = new ScheduledRequests(); - + private int containersAllocated = 0; private int containersReleased = 0; private int hostLocalAssigned = 0; @@ -370,76 +370,16 @@ public class RMContainerAllocator extends RMContainerRequestor } } - @SuppressWarnings({ "unchecked" }) protected synchronized void handleEvent(ContainerAllocatorEvent event) { recalculateReduceSchedule = true; if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) { ContainerRequestEvent reqEvent = (ContainerRequestEvent) event; - JobId jobId = getJob().getID(); - Resource supportedMaxContainerCapability = getMaxContainerCapability(); - if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) { - if (mapResourceRequest.equals(Resources.none())) { - mapResourceRequest = reqEvent.getCapability(); - eventHandler.handle(new JobHistoryEvent(jobId, - new NormalizedResourceEvent( - org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceRequest - .getMemorySize()))); - LOG.info("mapResourceRequest:" + mapResourceRequest); - if (mapResourceRequest.getMemorySize() > supportedMaxContainerCapability - .getMemorySize() - || mapResourceRequest.getVirtualCores() > supportedMaxContainerCapability - .getVirtualCores()) { - String diagMsg = - "MAP capability required is more than the supported " - + "max container capability in the cluster. Killing the Job. mapResourceRequest: " - + mapResourceRequest + " maxContainerCapability:" - + supportedMaxContainerCapability; - LOG.info(diagMsg); - eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg)); - eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); - } - } - // set the resources - reqEvent.getCapability().setMemorySize(mapResourceRequest.getMemorySize()); - reqEvent.getCapability().setVirtualCores( - mapResourceRequest.getVirtualCores()); - scheduledRequests.addMap(reqEvent);//maps are immediately scheduled + boolean isMap = reqEvent.getAttemptID().getTaskId().getTaskType(). + equals(TaskType.MAP); + if (isMap) { + handleMapContainerRequest(reqEvent); } else { - if (reduceResourceRequest.equals(Resources.none())) { - reduceResourceRequest = reqEvent.getCapability(); - eventHandler.handle(new JobHistoryEvent(jobId, - new NormalizedResourceEvent( - org.apache.hadoop.mapreduce.TaskType.REDUCE, - reduceResourceRequest.getMemorySize()))); - LOG.info("reduceResourceRequest:" + reduceResourceRequest); - if (reduceResourceRequest.getMemorySize() > supportedMaxContainerCapability - .getMemorySize() - || reduceResourceRequest.getVirtualCores() > supportedMaxContainerCapability - .getVirtualCores()) { - String diagMsg = - "REDUCE capability required is more than the " - + "supported max container capability in the cluster. Killing the " - + "Job. reduceResourceRequest: " + reduceResourceRequest - + " maxContainerCapability:" - + supportedMaxContainerCapability; - LOG.info(diagMsg); - eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg)); - eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); - } - } - // set the resources - reqEvent.getCapability().setMemorySize(reduceResourceRequest.getMemorySize()); - reqEvent.getCapability().setVirtualCores( - reduceResourceRequest.getVirtualCores()); - if (reqEvent.getEarlierAttemptFailed()) { - //add to the front of queue for fail fast - pendingReduces.addFirst(new ContainerRequest(reqEvent, - PRIORITY_REDUCE, reduceNodeLabelExpression)); - } else { - pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE, - reduceNodeLabelExpression)); - //reduces are added to pending and are slowly ramped up - } + handleReduceContainerRequest(reqEvent); } } else if ( @@ -476,6 +416,103 @@ public class RMContainerAllocator extends RMContainerRequestor } } + @SuppressWarnings({ "unchecked" }) + private void handleReduceContainerRequest(ContainerRequestEvent reqEvent) { + assert(reqEvent.getAttemptID().getTaskId().getTaskType().equals( + TaskType.REDUCE)); + + Resource supportedMaxContainerCapability = getMaxContainerCapability(); + JobId jobId = getJob().getID(); + + if (reduceResourceRequest.equals(Resources.none())) { + reduceResourceRequest = reqEvent.getCapability(); + eventHandler.handle(new JobHistoryEvent(jobId, + new NormalizedResourceEvent( + org.apache.hadoop.mapreduce.TaskType.REDUCE, + reduceResourceRequest.getMemorySize()))); + LOG.info("reduceResourceRequest:" + reduceResourceRequest); + } + + boolean reduceContainerRequestAccepted = true; + if (reduceResourceRequest.getMemorySize() > + supportedMaxContainerCapability.getMemorySize() + || + reduceResourceRequest.getVirtualCores() > + supportedMaxContainerCapability.getVirtualCores()) { + reduceContainerRequestAccepted = false; + } + + if (reduceContainerRequestAccepted) { + // set the resources + reqEvent.getCapability().setVirtualCores( + reduceResourceRequest.getVirtualCores()); + reqEvent.getCapability().setMemorySize( + reduceResourceRequest.getMemorySize()); + + if (reqEvent.getEarlierAttemptFailed()) { + //previously failed reducers are added to the front for fail fast + pendingReduces.addFirst(new ContainerRequest(reqEvent, + PRIORITY_REDUCE, reduceNodeLabelExpression)); + } else { + //reduces are added to pending queue and are slowly ramped up + pendingReduces.add(new ContainerRequest(reqEvent, + PRIORITY_REDUCE, reduceNodeLabelExpression)); + } + } else { + String diagMsg = "REDUCE capability required is more than the " + + "supported max container capability in the cluster. Killing" + + " the Job. reduceResourceRequest: " + reduceResourceRequest + + " maxContainerCapability:" + supportedMaxContainerCapability; + LOG.info(diagMsg); + eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg)); + eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); + } + } + + @SuppressWarnings({ "unchecked" }) + private void handleMapContainerRequest(ContainerRequestEvent reqEvent) { + assert(reqEvent.getAttemptID().getTaskId().getTaskType().equals( + TaskType.MAP)); + + Resource supportedMaxContainerCapability = getMaxContainerCapability(); + JobId jobId = getJob().getID(); + + if (mapResourceRequest.equals(Resources.none())) { + mapResourceRequest = reqEvent.getCapability(); + eventHandler.handle(new JobHistoryEvent(jobId, + new NormalizedResourceEvent( + org.apache.hadoop.mapreduce.TaskType.MAP, + mapResourceRequest.getMemorySize()))); + LOG.info("mapResourceRequest:" + mapResourceRequest); + } + + boolean mapContainerRequestAccepted = true; + if (mapResourceRequest.getMemorySize() > + supportedMaxContainerCapability.getMemorySize() + || + mapResourceRequest.getVirtualCores() > + supportedMaxContainerCapability.getVirtualCores()) { + mapContainerRequestAccepted = false; + } + + if(mapContainerRequestAccepted) { + // set the resources + reqEvent.getCapability().setMemorySize( + mapResourceRequest.getMemorySize()); + reqEvent.getCapability().setVirtualCores( + mapResourceRequest.getVirtualCores()); + scheduledRequests.addMap(reqEvent); //maps are immediately scheduled + } else { + String diagMsg = "The required MAP capability is more than the " + + "supported max container capability in the cluster. Killing" + + " the Job. mapResourceRequest: " + mapResourceRequest + + " maxContainerCapability:" + supportedMaxContainerCapability; + LOG.info(diagMsg); + eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg)); + eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); + } + } + private static String getHost(String contMgrAddress) { String host = contMgrAddress; String[] hostport = host.split(":"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc2b69eb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index bcce793..f9ee9cc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -1792,12 +1792,18 @@ public class TestRMContainerAllocator { private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, int memory, String[] hosts) { - return createReq(jobId, taskAttemptId, memory, hosts, false, false); + return createReq(jobId, taskAttemptId, memory, 1, hosts, false, false); } - private ContainerRequestEvent - createReq(JobId jobId, int taskAttemptId, int memory, String[] hosts, - boolean earlierFailedAttempt, boolean reduce) { + private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, + int mem, String[] hosts, boolean earlierFailedAttempt, boolean reduce) { + return createReq(jobId, taskAttemptId, mem, + 1, hosts, earlierFailedAttempt, reduce); + } + + private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, + int memory, int vcore, String[] hosts, boolean earlierFailedAttempt, + boolean reduce) { TaskId taskId; if (reduce) { taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); @@ -1806,7 +1812,7 @@ public class TestRMContainerAllocator { } TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, taskAttemptId); - Resource containerNeed = Resource.newInstance(memory, 1); + Resource containerNeed = Resource.newInstance(memory, vcore); if (earlierFailedAttempt) { return ContainerRequestEvent .createContainerRequestEventForFailedContainer(attemptId, @@ -2608,6 +2614,86 @@ public class TestRMContainerAllocator { } @Test + public void testUnsupportedMapContainerRequirement() throws Exception { + final Resource maxContainerSupported = Resource.newInstance(1, 1); + + final ApplicationId appId = ApplicationId.newInstance(1, 1); + final ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + final JobId jobId = + MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + + final MockScheduler mockScheduler = new MockScheduler(appAttemptId); + final Configuration conf = new Configuration(); + + final MyContainerAllocator allocator = new MyContainerAllocator(null, + conf, appAttemptId, mock(Job.class), SystemClock.getInstance()) { + @Override + protected void register() { + } + @Override + protected ApplicationMasterProtocol createSchedulerProxy() { + return mockScheduler; + } + @Override + protected Resource getMaxContainerCapability() { + return maxContainerSupported; + } + }; + + ContainerRequestEvent mapRequestEvt = createReq(jobId, 0, + (int) (maxContainerSupported.getMemorySize() + 10), + maxContainerSupported.getVirtualCores(), + new String[0], false, false); + allocator.sendRequests(Arrays.asList(mapRequestEvt)); + allocator.schedule(); + + Assert.assertEquals(0, mockScheduler.lastAnyAskMap); + } + + @Test + public void testUnsupportedReduceContainerRequirement() throws Exception { + final Resource maxContainerSupported = Resource.newInstance(1, 1); + + final ApplicationId appId = ApplicationId.newInstance(1, 1); + final ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + appId, 1); + final JobId jobId = + MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + + final MockScheduler mockScheduler = new MockScheduler(appAttemptId); + final Configuration conf = new Configuration(); + + final MyContainerAllocator allocator = new MyContainerAllocator(null, + conf, appAttemptId, mock(Job.class), SystemClock.getInstance()) { + @Override + protected void register() { + } + @Override + protected ApplicationMasterProtocol createSchedulerProxy() { + return mockScheduler; + } + @Override + protected Resource getMaxContainerCapability() { + return maxContainerSupported; + } + }; + + ContainerRequestEvent reduceRequestEvt = createReq(jobId, 0, + (int) (maxContainerSupported.getMemorySize() + 10), + maxContainerSupported.getVirtualCores(), + new String[0], false, true); + allocator.sendRequests(Arrays.asList(reduceRequestEvt)); + // Reducer container requests are added to the pending queue upon request, + // schedule all reducers here so that we can observe if reducer requests + // are accepted by RMContainerAllocator on RM side. + allocator.scheduleAllReduces(); + allocator.schedule(); + + Assert.assertEquals(0, mockScheduler.lastAnyAskReduce); + } + + @Test public void testRMUnavailable() throws Exception { Configuration conf = new Configuration(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org