Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B2A2A11EDD for ; Sat, 28 Jun 2014 23:42:14 +0000 (UTC) Received: (qmail 10052 invoked by uid 500); 28 Jun 2014 23:42:14 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 10011 invoked by uid 500); 28 Jun 2014 23:42:14 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 10000 invoked by uid 99); 28 Jun 2014 23:42:14 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 28 Jun 2014 23:42:14 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 28 Jun 2014 23:42:12 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 0D04B238890D; Sat, 28 Jun 2014 23:41:52 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1606408 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/ hadoop-yarn/hadoop-yarn-server/hadoo... Date: Sat, 28 Jun 2014 23:41:51 -0000 To: yarn-commits@hadoop.apache.org From: jianhe@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140628234152.0D04B238890D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jianhe Date: Sat Jun 28 23:41:51 2014 New Revision: 1606408 URL: http://svn.apache.org/r1606408 Log: Merge r1606407 from trunk. YARN-614. Changed ResourceManager to not count disk failure, node loss and RM restart towards app failures. Contributed by Xuan Gong Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1606408&r1=1606407&r2=1606408&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Sat Jun 28 23:41:51 2014 @@ -180,6 +180,9 @@ Release 2.5.0 - UNRELEASED YARN-2171. Improved CapacityScheduling to not lock on nodemanager-count when AMs heartbeat in. (Jason Lowe via vinodkv) + YARN-614. Changed ResourceManager to not count disk failure, node loss and + RM restart towards app failures. (Xuan Gong via jianhe) + OPTIMIZATIONS BUG FIXES Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1606408&r1=1606407&r2=1606408&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Sat Jun 28 23:41:51 2014 @@ -687,9 +687,10 @@ public class RMAppImpl implements RMApp, new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService, submissionContext, conf, // The newly created attempt maybe last attempt if (number of - // previously NonPreempted attempts + 1) equal to the max-attempt + // previously failed attempts(which should not include Preempted, + // hardware error and NM resync) + 1) equal to the max-attempt // limit. - maxAppAttempts == (getNumNonPreemptedAppAttempts() + 1)); + maxAppAttempts == (getNumFailedAppAttempts() + 1)); attempts.put(appAttemptId, attempt); currentAttempt = attempt; } @@ -797,7 +798,7 @@ public class RMAppImpl implements RMApp, && (app.currentAttempt.getState() == RMAppAttemptState.KILLED || app.currentAttempt.getState() == RMAppAttemptState.FINISHED || (app.currentAttempt.getState() == RMAppAttemptState.FAILED - && app.getNumNonPreemptedAppAttempts() == app.maxAppAttempts))) { + && app.getNumFailedAppAttempts() == app.maxAppAttempts))) { return RMAppState.ACCEPTED; } @@ -888,7 +889,7 @@ public class RMAppImpl implements RMApp, msg = "Unmanaged application " + this.getApplicationId() + " failed due to " + failedEvent.getDiagnostics() + ". Failing the application."; - } else if (getNumNonPreemptedAppAttempts() >= this.maxAppAttempts) { + } else if (getNumFailedAppAttempts() >= this.maxAppAttempts) { msg = "Application " + this.getApplicationId() + " failed " + this.maxAppAttempts + " times due to " + failedEvent.getDiagnostics() + ". Failing the application."; @@ -1105,11 +1106,12 @@ public class RMAppImpl implements RMApp, }; } - private int getNumNonPreemptedAppAttempts() { + private int getNumFailedAppAttempts() { int completedAttempts = 0; - // Do not count AM preemption as attempt failure. + // Do not count AM preemption, hardware failures or NM resync + // as attempt failure. for (RMAppAttempt attempt : attempts.values()) { - if (!attempt.isPreempted()) { + if (attempt.shouldCountTowardsMaxAttemptRetry()) { completedAttempts++; } } @@ -1129,7 +1131,7 @@ public class RMAppImpl implements RMApp, public RMAppState transition(RMAppImpl app, RMAppEvent event) { if (!app.submissionContext.getUnmanagedAM() - && app.getNumNonPreemptedAppAttempts() < app.maxAppAttempts) { + && app.getNumFailedAppAttempts() < app.maxAppAttempts) { boolean transferStateFromPreviousAttempt = false; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; transferStateFromPreviousAttempt = Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java?rev=1606408&r1=1606407&r2=1606408&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java Sat Jun 28 23:41:51 2014 @@ -197,8 +197,14 @@ public interface RMAppAttempt extends Ev ApplicationAttemptReport createApplicationAttemptReport(); /** - * Return the flag which indicates whether the attempt is preempted by the - * scheduler. + * Return the flag which indicates whether the attempt failure should be + * counted to attempt retry count. + *
    + * There failure types should not be counted to attempt retry count: + *
  • preempted by the scheduler.
  • + *
  • hardware failures, such as NM failing, lost NM and NM disk errors.
  • + *
  • killed by RM because of RM restart or failover.
  • + *
*/ - boolean isPreempted(); + boolean shouldCountTowardsMaxAttemptRetry(); } Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1606408&r1=1606407&r2=1606408&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Sat Jun 28 23:41:51 2014 @@ -149,9 +149,10 @@ public class RMAppAttemptImpl implements private int amContainerExitStatus = ContainerExitStatus.INVALID; private Configuration conf; - // Since AM preemption is not counted towards AM failure count, - // even if this flag is true, a new attempt can still be re-created if this - // attempt is eventually preempted. So this flag indicates that this may be + // Since AM preemption, hardware error and NM resync are not counted towards + // AM failure count, even if this flag is true, a new attempt can still be + // re-created if this attempt is eventually failed because of preemption, + // hardware error or NM resync. So this flag indicates that this may be // last attempt. private final boolean maybeLastAttempt; private static final ExpiredTransition EXPIRED_TRANSITION = @@ -1087,12 +1088,13 @@ public class RMAppAttemptImpl implements .getKeepContainersAcrossApplicationAttempts() && !appAttempt.submissionContext.getUnmanagedAM()) { // See if we should retain containers for non-unmanaged applications - if (appAttempt.isPreempted()) { - // Premption doesn't count towards app-failures and so we should - // retain containers. + if (!appAttempt.shouldCountTowardsMaxAttemptRetry()) { + // Premption, hardware failures, NM resync doesn't count towards + // app-failures and so we should retain containers. keepContainersAcrossAppAttempts = true; } else if (!appAttempt.maybeLastAttempt) { - // Not preemption. Not last-attempt too - keep containers. + // Not preemption, hardware failures or NM resync. + // Not last-attempt too - keep containers. keepContainersAcrossAppAttempts = true; } } @@ -1136,8 +1138,17 @@ public class RMAppAttemptImpl implements } @Override - public boolean isPreempted() { - return getAMContainerExitStatus() == ContainerExitStatus.PREEMPTED; + public boolean shouldCountTowardsMaxAttemptRetry() { + try { + this.readLock.lock(); + int exitStatus = getAMContainerExitStatus(); + return !(exitStatus == ContainerExitStatus.PREEMPTED + || exitStatus == ContainerExitStatus.ABORTED + || exitStatus == ContainerExitStatus.DISKS_FAILED + || exitStatus == ContainerExitStatus.KILLED_BY_RESOURCEMANAGER); + } finally { + this.readLock.unlock(); + } } private static final class UnmanagedAMAttemptSavedTransition Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1606408&r1=1606407&r2=1606408&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Sat Jun 28 23:41:51 2014 @@ -19,13 +19,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -34,6 +37,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -49,6 +53,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -347,15 +352,20 @@ public class TestAMRestart { rm1.stop(); } - // AM container preempted should not be counted towards AM max retry count. - @Test(timeout = 20000) - public void testAMPreemptedNotCountedForAMFailures() throws Exception { + // AM container preempted, nm disk failure + // should not be counted towards AM max retry count. + @Test(timeout = 100000) + public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); // explicitly set max-am-retry count as 1. conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); - MockRM rm1 = new MockRM(conf); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + MockRM rm1 = new MockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); @@ -371,8 +381,10 @@ public class TestAMRestart { scheduler.killContainer(scheduler.getRMContainer(amContainer)); am1.waitForState(RMAppAttemptState.FAILED); - Assert.assertTrue(attempt1.isPreempted()); + Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + ApplicationState appState = + memStore.getState().getApplicationState().get(app1.getApplicationId()); // AM should be restarted even though max-am-attempt is 1. MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1); RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); @@ -384,20 +396,62 @@ public class TestAMRestart { scheduler.killContainer(scheduler.getRMContainer(amContainer2)); am2.waitForState(RMAppAttemptState.FAILED); - Assert.assertTrue(attempt2.isPreempted()); + Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry()); rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1); RMAppAttempt attempt3 = app1.getCurrentAppAttempt(); Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt()); - // fail the AM normally - nm1.nodeHeartbeat(am3.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + // mimic NM disk_failure + ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class); + containerStatus.setContainerId(attempt3.getMasterContainer().getId()); + containerStatus.setDiagnostics("mimic NM disk_failure"); + containerStatus.setState(ContainerState.COMPLETE); + containerStatus.setExitStatus(ContainerExitStatus.DISKS_FAILED); + Map> conts = + new HashMap>(); + conts.put(app1.getApplicationId(), + Collections.singletonList(containerStatus)); + nm1.nodeHeartbeat(conts, true); + am3.waitForState(RMAppAttemptState.FAILED); - Assert.assertFalse(attempt3.isPreempted()); + Assert.assertTrue(! attempt3.shouldCountTowardsMaxAttemptRetry()); + Assert.assertEquals(ContainerExitStatus.DISKS_FAILED, + appState.getAttempt(am3.getApplicationAttemptId()) + .getAMContainerExitStatus()); + + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + MockAM am4 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + RMAppAttempt attempt4 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt()); + + // create second NM, and register to rm1 + MockNM nm2 = + new MockNM("127.0.0.1:2234", 8000, rm1.getResourceTrackerService()); + nm2.registerNode(); + // nm1 heartbeats to report unhealthy + // This will mimic ContainerExitStatus.ABORT + nm1.nodeHeartbeat(false); + am4.waitForState(RMAppAttemptState.FAILED); + Assert.assertTrue(! attempt4.shouldCountTowardsMaxAttemptRetry()); + Assert.assertEquals(ContainerExitStatus.ABORTED, + appState.getAttempt(am4.getApplicationAttemptId()) + .getAMContainerExitStatus()); + // launch next AM in nm2 + nm2.nodeHeartbeat(true); + MockAM am5 = + rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm2); + RMAppAttempt attempt5 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt5).mayBeLastAttempt()); + // fail the AM normally + nm2 + .nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am5.waitForState(RMAppAttemptState.FAILED); + Assert.assertTrue(attempt5.shouldCountTowardsMaxAttemptRetry()); // AM should not be restarted. rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED); - Assert.assertEquals(3, app1.getAppAttempts().size()); + Assert.assertEquals(5, app1.getAppAttempts().size()); rm1.stop(); } @@ -433,7 +487,7 @@ public class TestAMRestart { scheduler.killContainer(scheduler.getRMContainer(amContainer)); am1.waitForState(RMAppAttemptState.FAILED); - Assert.assertTrue(attempt1.isPreempted()); + Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); // state store has 1 attempt stored. @@ -457,10 +511,73 @@ public class TestAMRestart { RMAppAttempt attempt2 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()) .getCurrentAppAttempt(); - Assert.assertFalse(attempt2.isPreempted()); + Assert.assertTrue(attempt2.shouldCountTowardsMaxAttemptRetry()); + Assert.assertEquals(ContainerExitStatus.INVALID, + appState.getAttempt(am2.getApplicationAttemptId()) + .getAMContainerExitStatus()); + rm1.stop(); + rm2.stop(); + } + + // Test regular RM restart/failover, new RM should not count + // AM failure towards the max-retry-account and should be able to + // re-launch the AM. + @Test(timeout = 50000) + public void testRMRestartOrFailoverNotCountedForAMFailures() + throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + // explicitly set max-am-retry count as 1. + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + // AM should be restarted even though max-am-attempt is 1. + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt1).mayBeLastAttempt()); + + // Restart rm. + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + ApplicationState appState = + memStore.getState().getApplicationState().get(app1.getApplicationId()); + // re-register the NM + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + NMContainerStatus status = Records.newRecord(NMContainerStatus.class); + status + .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER); + status.setContainerId(attempt1.getMasterContainer().getId()); + status.setContainerState(ContainerState.COMPLETE); + status.setDiagnostics(""); + nm1.registerNode(Collections.singletonList(status), null); + + rm2.waitForState(attempt1.getAppAttemptId(), RMAppAttemptState.FAILED); + Assert.assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, + appState.getAttempt(am1.getApplicationAttemptId()) + .getAMContainerExitStatus()); + // Will automatically start a new AppAttempt in rm2 + rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + MockAM am2 = + rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1); + MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2); + RMAppAttempt attempt3 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()) + .getCurrentAppAttempt(); + Assert.assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry()); Assert.assertEquals(ContainerExitStatus.INVALID, appState.getAttempt(am2.getApplicationAttemptId()) .getAMContainerExitStatus()); + rm1.stop(); rm2.stop(); }