Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1541711B6B for ; Thu, 17 Jul 2014 19:01:52 +0000 (UTC) Received: (qmail 44986 invoked by uid 500); 17 Jul 2014 19:01:51 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 44912 invoked by uid 500); 17 Jul 2014 19:01:51 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 44901 invoked by uid 99); 17 Jul 2014 19:01:51 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jul 2014 19:01:51 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jul 2014 19:01:48 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6E60323888D7; Thu, 17 Jul 2014 19:01:28 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1611436 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/... Date: Thu, 17 Jul 2014 19:01:27 -0000 To: mapreduce-commits@hadoop.apache.org From: jianhe@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140717190128.6E60323888D7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jianhe Date: Thu Jul 17 19:01:27 2014 New Revision: 1611436 URL: http://svn.apache.org/r1611436 Log: Merge r1611434 from trunk. MAPREDUCE-5910. Make MR AM resync with RM in case of work-preserving RM-restart. Contributed by Rohith Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1611436&r1=1611435&r2=1611436&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu Jul 17 19:01:27 2014 @@ -5,6 +5,8 @@ Release 2.6.0 - UNRELEASED INCOMPATIBLE CHANGES NEW FEATURES + MAPREDUCE-5910. Make MR AM resync with RM in case of work-preserving + RM-restart. Contributed by Rohith IMPROVEMENTS Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1611436&r1=1611435&r2=1611436&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Thu Jul 17 19:01:27 2014 @@ -64,6 +64,7 @@ public class LocalContainerAllocator ext private int nmPort; private int nmHttpPort; private ContainerId containerId; + protected int lastResponseID; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -119,6 +120,11 @@ public class LocalContainerAllocator ext if (allocateResponse.getAMCommand() != null) { switch(allocateResponse.getAMCommand()) { case AM_RESYNC: + LOG.info("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + this.lastResponseID = 0; + register(); + break; case AM_SHUTDOWN: LOG.info("Event from RM: shutting down Application Master"); // This can happen if the RM has been restarted. If it is in that state, Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1611436&r1=1611435&r2=1611436&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Thu Jul 17 19:01:27 2014 @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -216,20 +217,27 @@ public abstract class RMCommunicator ext FinishApplicationMasterRequest request = FinishApplicationMasterRequest.newInstance(finishState, sb.toString(), historyUrl); - while (true) { - FinishApplicationMasterResponse response = - scheduler.finishApplicationMaster(request); - if (response.getIsUnregistered()) { - // When excepting ClientService, other services are already stopped, - // it is safe to let clients know the final states. ClientService - // should wait for some time so clients have enough time to know the - // final states. - RunningAppContext raContext = (RunningAppContext) context; - raContext.markSuccessfulUnregistration(); - break; + try { + while (true) { + FinishApplicationMasterResponse response = + scheduler.finishApplicationMaster(request); + if (response.getIsUnregistered()) { + // When excepting ClientService, other services are already stopped, + // it is safe to let clients know the final states. ClientService + // should wait for some time so clients have enough time to know the + // final states. + RunningAppContext raContext = (RunningAppContext) context; + raContext.markSuccessfulUnregistration(); + break; + } + LOG.info("Waiting for application to be successfully unregistered."); + Thread.sleep(rmPollInterval); } - LOG.info("Waiting for application to be successfully unregistered."); - Thread.sleep(rmPollInterval); + } catch (ApplicationMasterNotRegisteredException e) { + // RM might have restarted or failed over and so lost the fact that AM had + // registered before. + register(); + doUnregistration(); } } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1611436&r1=1611435&r2=1611436&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Jul 17 19:01:27 2014 @@ -383,6 +383,7 @@ public class RMContainerAllocator extend removed = true; assignedRequests.remove(aId); containersReleased++; + pendingRelease.add(containerId); release(containerId); } } @@ -631,6 +632,15 @@ public class RMContainerAllocator extend if (response.getAMCommand() != null) { switch(response.getAMCommand()) { case AM_RESYNC: + LOG.info("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + lastResponseID = 0; + + // Registering to allow RM to discover an active AM for this + // application + register(); + addOutstandingRequestOnResync(); + break; case AM_SHUTDOWN: // This can happen if the RM has been restarted. If it is in that state, // this application must clean itself up. @@ -682,6 +692,7 @@ public class RMContainerAllocator extend LOG.error("Container complete event for unknown container id " + cont.getContainerId()); } else { + pendingRelease.remove(cont.getContainerId()); assignedRequests.remove(attemptID); // send the container completed event to Task attempt @@ -971,6 +982,7 @@ public class RMContainerAllocator extend private void containerNotAssigned(Container allocated) { containersReleased++; + pendingRelease.add(allocated.getId()); release(allocated.getId()); } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1611436&r1=1611435&r2=1611436&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Thu Jul 17 19:01:27 2014 @@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -58,7 +59,7 @@ public abstract class RMContainerRequest private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class); - private int lastResponseID; + protected int lastResponseID; private Resource availableResources; private final RecordFactory recordFactory = @@ -77,8 +78,11 @@ public abstract class RMContainerRequest // numContainers dont end up as duplicates private final Set ask = new TreeSet( new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator()); - private final Set release = new TreeSet(); - + private final Set release = new TreeSet(); + // pendingRelease holds history or release requests.request is removed only if + // RM sends completedContainer. + // How it different from release? --> release is for per allocate() request. + protected Set pendingRelease = new TreeSet(); private boolean nodeBlacklistingEnabled; private int blacklistDisablePercent; private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false); @@ -186,6 +190,10 @@ public abstract class RMContainerRequest } catch (YarnException e) { throw new IOException(e); } + + if (isResyncCommand(allocateResponse)) { + return allocateResponse; + } lastResponseID = allocateResponse.getResponseId(); availableResources = allocateResponse.getAvailableResources(); lastClusterNmCount = clusterNmCount; @@ -214,6 +222,28 @@ public abstract class RMContainerRequest return allocateResponse; } + protected boolean isResyncCommand(AllocateResponse allocateResponse) { + return allocateResponse.getAMCommand() != null + && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC; + } + + protected void addOutstandingRequestOnResync() { + for (Map> rr : remoteRequestsTable + .values()) { + for (Map capabalities : rr.values()) { + for (ResourceRequest request : capabalities.values()) { + addResourceRequestToAsk(request); + } + } + } + if (!ignoreBlacklisting.get()) { + blacklistAdditions.addAll(blacklistedNodes); + } + if (!pendingRelease.isEmpty()) { + release.addAll(pendingRelease); + } + } + // May be incorrect if there's multiple NodeManagers running on a single host. // knownNodeCount is based on node managers, not hosts. blacklisting is // currently based on hosts. Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java?rev=1611436&r1=1611435&r2=1611436&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java Thu Jul 17 19:01:27 2014 @@ -77,6 +77,7 @@ import org.apache.hadoop.security.UserGr import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -86,6 +87,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; @@ -94,9 +96,13 @@ import org.apache.hadoop.yarn.exceptions import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -617,6 +623,10 @@ public class TestRMContainerAllocator { super(conf); } + public MyResourceManager(Configuration conf, RMStateStore store) { + super(conf, store); + } + @Override public void serviceStart() throws Exception { super.serviceStart(); @@ -1425,6 +1435,13 @@ public class TestRMContainerAllocator { rm.getMyFifoScheduler().lastBlacklistRemovals.size()); } + private static void assertAsksAndReleases(int expectedAsk, + int expectedRelease, MyResourceManager rm) { + Assert.assertEquals(expectedAsk, rm.getMyFifoScheduler().lastAsk.size()); + Assert.assertEquals(expectedRelease, + rm.getMyFifoScheduler().lastRelease.size()); + } + private static class MyFifoScheduler extends FifoScheduler { public MyFifoScheduler(RMContext rmContext) { @@ -1439,6 +1456,7 @@ public class TestRMContainerAllocator { } List lastAsk = null; + List lastRelease = null; List lastBlacklistAdditions; List lastBlacklistRemovals; @@ -1457,6 +1475,7 @@ public class TestRMContainerAllocator { askCopy.add(reqCopy); } lastAsk = ask; + lastRelease = release; lastBlacklistAdditions = blacklistAdditions; lastBlacklistRemovals = blacklistRemovals; return super.allocate( @@ -1504,6 +1523,20 @@ public class TestRMContainerAllocator { return new ContainerFailedEvent(attemptId, host); } + private ContainerAllocatorEvent createDeallocateEvent(JobId jobId, + int taskAttemptId, boolean reduce) { + TaskId taskId; + if (reduce) { + taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); + } else { + taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); + } + TaskAttemptId attemptId = + MRBuilderUtils.newTaskAttemptId(taskId, taskAttemptId); + return new ContainerAllocatorEvent(attemptId, + ContainerAllocator.EventType.CONTAINER_DEALLOCATE); + } + private void checkAssignments(ContainerRequestEvent[] requests, List assignments, boolean checkHostMatch) { @@ -1556,6 +1589,7 @@ public class TestRMContainerAllocator { = new ArrayList(); private MyResourceManager rm; private boolean isUnregistered = false; + private AllocateResponse allocateResponse; private static AppContext createAppContext( ApplicationAttemptId appAttemptId, Job job) { AppContext context = mock(AppContext.class); @@ -1665,6 +1699,10 @@ public class TestRMContainerAllocator { super.handleEvent(f); } + public void sendDeallocate(ContainerAllocatorEvent f) { + super.handleEvent(f); + } + // API to be used by tests public List schedule() throws Exception { @@ -1710,6 +1748,20 @@ public class TestRMContainerAllocator { public boolean isUnregistered() { return isUnregistered; } + + public void updateSchedulerProxy(MyResourceManager rm) { + scheduler = rm.getApplicationMasterService(); + } + + @Override + protected AllocateResponse makeRemoteRequest() throws IOException { + allocateResponse = super.makeRemoteRequest(); + return allocateResponse; + } + + public boolean isResyncCommand() { + return super.isResyncCommand(allocateResponse); + } } @Test @@ -2017,6 +2069,198 @@ public class TestRMContainerAllocator { Assert.assertTrue(allocator.isUnregistered()); } + // Step-1 : AM send allocate request for 2 ContainerRequests and 1 + // blackListeNode + // Step-2 : 2 containers are allocated by RM. + // Step-3 : AM Send 1 containerRequest(event3) and 1 releaseRequests to + // RM + // Step-4 : On RM restart, AM(does not know RM is restarted) sends + // additional containerRequest(event4) and blacklisted nodes. + // Intern RM send resync command + // Step-5 : On Resync,AM sends all outstanding + // asks,release,blacklistAaddition + // and another containerRequest(event5) + // Step-6 : RM allocates containers i.e event3,event4 and cRequest5 + @Test + public void testRMContainerAllocatorResendsRequestsOnRMRestart() + throws Exception { + + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); + + conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); + conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); + conf.setInt( + MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + MyResourceManager rm1 = new MyResourceManager(conf, memStore); + rm1.start(); + DrainDispatcher dispatcher = + (DrainDispatcher) rm1.getRMContext().getDispatcher(); + + // Submit the application + RMApp app = rm1.submitApp(1024); + dispatcher.await(); + + MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + nm1.nodeHeartbeat(true); // Node heartbeat + dispatcher.await(); + + ApplicationAttemptId appAttemptId = + app.getCurrentAppAttempt().getAppAttemptId(); + rm1.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + MyContainerAllocator allocator = + new MyContainerAllocator(rm1, conf, appAttemptId, mockJob); + + // Step-1 : AM send allocate request for 2 ContainerRequests and 1 + // blackListeNode + // create the container request + // send MAP request + ContainerRequestEvent event1 = + createReq(jobId, 1, 1024, new String[] { "h1" }); + allocator.sendRequest(event1); + + ContainerRequestEvent event2 = + createReq(jobId, 2, 2048, new String[] { "h1", "h2" }); + allocator.sendRequest(event2); + + // Send events to blacklist h2 + ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h2", false); + allocator.sendFailure(f1); + + // send allocate request and 1 blacklisted nodes + List assignedContainers = + allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, + assignedContainers.size()); + // Why ask is 3, not 4? --> ask from blacklisted node h2 is removed + assertAsksAndReleases(3, 0, rm1); + assertBlacklistAdditionsAndRemovals(1, 0, rm1); + + nm1.nodeHeartbeat(true); // Node heartbeat + dispatcher.await(); + + // Step-2 : 2 containers are allocated by RM. + assignedContainers = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 2", 2, + assignedContainers.size()); + assertAsksAndReleases(0, 0, rm1); + assertBlacklistAdditionsAndRemovals(0, 0, rm1); + + assignedContainers = allocator.schedule(); + Assert.assertEquals("No of assignments must be 0", 0, + assignedContainers.size()); + assertAsksAndReleases(3, 0, rm1); + assertBlacklistAdditionsAndRemovals(0, 0, rm1); + + // Step-3 : AM Send 1 containerRequest(event3) and 1 releaseRequests to + // RM + // send container request + ContainerRequestEvent event3 = + createReq(jobId, 3, 1000, new String[] { "h1" }); + allocator.sendRequest(event3); + + // send deallocate request + ContainerAllocatorEvent deallocate1 = + createDeallocateEvent(jobId, 1, false); + allocator.sendDeallocate(deallocate1); + + assignedContainers = allocator.schedule(); + Assert.assertEquals("No of assignments must be 0", 0, + assignedContainers.size()); + assertAsksAndReleases(3, 1, rm1); + assertBlacklistAdditionsAndRemovals(0, 0, rm1); + + // Phase-2 start 2nd RM is up + MyResourceManager rm2 = new MyResourceManager(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + allocator.updateSchedulerProxy(rm2); + dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher(); + + // NM should be rebooted on heartbeat, even first heartbeat for nm2 + NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction()); + + // new NM to represent NM re-register + nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService()); + nm1.registerNode(); + nm1.nodeHeartbeat(true); + dispatcher.await(); + + // Step-4 : On RM restart, AM(does not know RM is restarted) sends + // additional containerRequest(event4) and blacklisted nodes. + // Intern RM send resync command + + // send deallocate request, release=1 + ContainerAllocatorEvent deallocate2 = + createDeallocateEvent(jobId, 2, false); + allocator.sendDeallocate(deallocate2); + + // Send events to blacklist nodes h3 + ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h3", false); + allocator.sendFailure(f2); + + ContainerRequestEvent event4 = + createReq(jobId, 4, 2000, new String[] { "h1", "h2" }); + allocator.sendRequest(event4); + + // send allocate request to 2nd RM and get resync command + allocator.schedule(); + dispatcher.await(); + Assert.assertTrue("Last allocate response is not RESYNC", + allocator.isResyncCommand()); + + // Step-5 : On Resync,AM sends all outstanding + // asks,release,blacklistAaddition + // and another containerRequest(event5) + ContainerRequestEvent event5 = + createReq(jobId, 5, 3000, new String[] { "h1", "h2", "h3" }); + allocator.sendRequest(event5); + + // send all outstanding request again. + assignedContainers = allocator.schedule(); + dispatcher.await(); + assertAsksAndReleases(3, 2, rm2); + assertBlacklistAdditionsAndRemovals(2, 0, rm2); + + nm1.nodeHeartbeat(true); + dispatcher.await(); + + // Step-6 : RM allocates containers i.e event3,event4 and cRequest5 + assignedContainers = allocator.schedule(); + dispatcher.await(); + + Assert.assertEquals("Number of container should be 3", 3, + assignedContainers.size()); + + for (TaskAttemptContainerAssignedEvent assig : assignedContainers) { + Assert.assertTrue("Assigned count not correct", + "h1".equals(assig.getContainer().getNodeId().getHost())); + } + + rm1.stop(); + rm2.stop(); + + } + public static void main(String[] args) throws Exception { TestRMContainerAllocator t = new TestRMContainerAllocator(); t.testSimple();