hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1537560 [3/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/proto/server/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanage...
Date Thu, 31 Oct 2013 17:25:07 GMT
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1537560&r1=1537559&r2=1537560&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Thu Oct 31 17:25:06 2013
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -41,16 +42,24 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@@ -90,13 +99,12 @@ public class TestRMRestart {
     UserGroupInformation.setConfiguration(conf);
     conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
     conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
-
     rmAddr = new InetSocketAddress("localhost", 8032);
+    Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
   }
 
   @Test (timeout=180000)
   public void testRMRestart() throws Exception {
-    Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
         YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
 
@@ -122,7 +130,7 @@ public class TestRMRestart {
     nm1.registerNode();
     nm2.registerNode(); // nm2 will not heartbeat with RM1
     
-    // create app that will not be saved because it will finish
+    // create app that will finish and the final state should be saved.
     RMApp app0 = rm1.submitApp(200);
     RMAppAttempt attempt0 = app0.getCurrentAppAttempt();
     // spot check that app is saved
@@ -130,14 +138,8 @@ public class TestRMRestart {
     nm1.nodeHeartbeat(true);
     MockAM am0 = rm1.sendAMLaunched(attempt0.getAppAttemptId());
     am0.registerAppAttempt();
-    am0.unregisterAppAttempt();
-    nm1.nodeHeartbeat(attempt0.getAppAttemptId(), 1, ContainerState.COMPLETE);
-    am0.waitForState(RMAppAttemptState.FINISHED);
-    rm1.waitForState(app0.getApplicationId(), RMAppState.FINISHED);
-
-    // spot check that app is not saved anymore
-    Assert.assertEquals(0, rmAppState.size());
-        
+    finishApplicationMaster(app0, rm1, nm1, am0);
+
     // create app that gets launched and does allocate before RM restart
     RMApp app1 = rm1.submitApp(200);
     // assert app1 info is saved
@@ -209,7 +211,6 @@ public class TestRMRestart {
         .getApplicationId(), appUnmanaged.getApplicationSubmissionContext()
         .getApplicationId());  
     
-    
     // PHASE 2: create new RM and start from old state
     
     // create new RM to represent restart and recover state
@@ -223,11 +224,17 @@ public class TestRMRestart {
     nm2.setResourceTrackerService(rm2.getResourceTrackerService());
 
     // verify load of old state
-    // only 2 apps are loaded since unmanaged app is not loaded back since it
-    // cannot be restarted by the RM this will change with work preserving RM
-    // restart in which AMs/NMs are not rebooted
-    Assert.assertEquals(2, rm2.getRMContext().getRMApps().size());
-    
+    // 4 apps are loaded.
+    // FINISHED app and attempt is also loaded back.
+    // Unmanaged app state is still loaded back but it cannot be restarted by
+    // the RM. this will change with work preserving RM restart in which AMs/NMs
+    // are not rebooted.
+    Assert.assertEquals(4, rm2.getRMContext().getRMApps().size());
+    // check that earlier finished app and attempt is also loaded back and move
+    // to finished state.
+    rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED);
+    rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
+
     // verify correct number of attempts and other data
     RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
     Assert.assertNotNull(loadedApp1);
@@ -331,29 +338,343 @@ public class TestRMRestart {
           new ArrayList<ContainerId>()).getAllocatedContainers());
       Thread.sleep(500);
     }
+    // finish the AMs
+    finishApplicationMaster(loadedApp1, rm2, am1Node, am1);
+    finishApplicationMaster(loadedApp2, rm2, am2Node, am2);
 
-    // finish the AM's
-    am1.unregisterAppAttempt();
-    rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.FINISHING);
-    am1Node.nodeHeartbeat(attempt1.getAppAttemptId(), 1, ContainerState.COMPLETE);
-    am1.waitForState(RMAppAttemptState.FINISHED);
-    
-    am2.unregisterAppAttempt();
-    rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.FINISHING);
-    am2Node.nodeHeartbeat(attempt2.getAppAttemptId(), 1, ContainerState.COMPLETE);
-    am2.waitForState(RMAppAttemptState.FINISHED);
-    
     // stop RM's
     rm2.stop();
     rm1.stop();
     
-    // completed apps should be removed
-    Assert.assertEquals(0, rmAppState.size());
+    // completed apps are not removed immediately after app finish
+    // And finished app is also loaded back.
+    Assert.assertEquals(4, rmAppState.size());
  }
-  
+
+  @Test
+  public void testRMRestartAppRunningAMFailed() throws Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+      YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    RMState rmState = memStore.getState();
+    Map<ApplicationId, ApplicationState> rmAppState =
+        rmState.getApplicationState();
+
+    // start RM
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = launchAM(app0, rm1, nm1);
+
+    // fail the AM by sending CONTAINER_FINISHED event without registering.
+    nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    am0.waitForState(RMAppAttemptState.FAILED);
+
+    ApplicationState appState = rmAppState.get(app0.getApplicationId());
+    // assert the AM failed state is saved.
+    Assert.assertEquals(RMAppAttemptState.FAILED,
+      appState.getAttempt(am0.getApplicationAttemptId()).getState());
+
+    // assert app state has not been saved.
+    Assert.assertNull(rmAppState.get(app0.getApplicationId()).getState());
+
+    // new AM started but not registered, app still stays at ACCECPTED state.
+    rm1.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
+
+    // start new RM
+    MockRM rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    // assert the previous AM state is loaded back on RM recovery.
+    RMApp recoveredApp =
+        rm2.getRMContext().getRMApps().get(app0.getApplicationId());
+    Assert.assertEquals(RMAppAttemptState.FAILED, recoveredApp
+      .getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState());
+  }
+
+  @Test
+  public void testRMRestartFailedApp() throws Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    RMState rmState = memStore.getState();
+    Map<ApplicationId, ApplicationState> rmAppState =
+        rmState.getApplicationState();
+
+    // start RM
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = launchAM(app0, rm1, nm1);
+
+    // fail the AM by sending CONTAINER_FINISHED event without registering.
+    nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    am0.waitForState(RMAppAttemptState.FAILED);
+    rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
+
+    // assert the app/attempt failed state is saved.
+    ApplicationState appState = rmAppState.get(app0.getApplicationId());
+    Assert.assertEquals(RMAppState.FAILED, appState.getState());
+    Assert.assertEquals(RMAppAttemptState.FAILED,
+      appState.getAttempt(am0.getApplicationAttemptId()).getState());
+
+    // start new RM
+    MockRM rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    RMApp loadedApp0 = rm2.getRMContext().getRMApps().get(app0.getApplicationId());
+    rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED);
+    rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED);
+    // no new attempt is created.
+    Assert.assertEquals(1, loadedApp0.getAppAttempts().size());
+
+    verifyAppReportAfterRMRestart(app0, rm2);
+    Assert.assertTrue(app0.getDiagnostics().toString()
+      .contains("Failing the application."));
+    // failed diagnostics from attempt is lost because the diagnostics from
+    // attempt is not yet available by the time app is saving the app state.
+  }
+
+  @Test
+  public void testRMRestartKilledApp() throws Exception{
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+      YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    RMState rmState = memStore.getState();
+    Map<ApplicationId, ApplicationState> rmAppState =
+        rmState.getApplicationState();
+
+    // start RM
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = launchAM(app0, rm1, nm1);
+
+    // kill the app.
+    rm1.killApp(app0.getApplicationId());
+    rm1.waitForState(app0.getApplicationId(), RMAppState.KILLED);
+    rm1.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.KILLED);
+
+    // killed state is saved.
+    ApplicationState appState = rmAppState.get(app0.getApplicationId());
+    Assert.assertEquals(RMAppState.KILLED, appState.getState());
+    Assert.assertEquals(RMAppAttemptState.KILLED,
+      appState.getAttempt(am0.getApplicationAttemptId()).getState());
+
+    // restart rm
+    MockRM rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    RMApp loadedApp0 = rm2.getRMContext().getRMApps().get(app0.getApplicationId());
+    rm2.waitForState(app0.getApplicationId(), RMAppState.KILLED);
+    rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.KILLED);
+    // no new attempt is created.
+    Assert.assertEquals(1, loadedApp0.getAppAttempts().size());
+
+    ApplicationReport appReport = verifyAppReportAfterRMRestart(app0, rm2);
+    Assert.assertEquals(app0.getDiagnostics().toString(),
+      appReport.getDiagnostics());
+  }
+
+  @Test
+  public void testRMRestartSucceededApp() throws Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+      YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    RMState rmState = memStore.getState();
+    Map<ApplicationId, ApplicationState> rmAppState =
+        rmState.getApplicationState();
+
+    // start RM
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create an app and finish the app.
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = launchAM(app0, rm1, nm1);
+
+    // unregister am
+    FinishApplicationMasterRequest req =
+        FinishApplicationMasterRequest.newInstance(
+          FinalApplicationStatus.SUCCEEDED, "diagnostics", "trackingUrl");
+    finishApplicationMaster(app0, rm1, nm1, am0, req);
+ 
+    // check the state store about the unregistered info.
+    ApplicationState appState = rmAppState.get(app0.getApplicationId());
+    ApplicationAttemptState attemptState0 =
+      appState.getAttempt(am0.getApplicationAttemptId());
+    Assert.assertEquals("diagnostics", attemptState0.getDiagnostics());
+    Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
+      attemptState0.getFinalApplicationStatus());
+    Assert.assertEquals("trackingUrl", attemptState0.getFinalTrackingUrl());
+    Assert.assertEquals(app0.getFinishTime(), appState.getFinishTime());
+
+    // restart rm
+    MockRM rm2 = new MockRM(conf, memStore);
+    rm2.start();
+
+    // verify application report returns the same app info as the app info
+    // before RM restarts.
+    ApplicationReport appReport = verifyAppReportAfterRMRestart(app0, rm2);
+    Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
+      appReport.getFinalApplicationStatus());
+    Assert.assertEquals("trackingUrl", appReport.getOriginalTrackingUrl());
+  }
+
+  @Test
+  public void testRMRestartGetApplicationList() throws Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // start RM
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // a succeeded app.
+    RMApp app0 = rm1.submitApp(200, "name", "user", null,
+      false, "default", 1, null, "myType");
+    MockAM am0 = launchAM(app0, rm1, nm1);
+    finishApplicationMaster(app0, rm1, nm1, am0);
+
+    // a failed app.
+    RMApp app1 = rm1.submitApp(200, "name", "user", null,
+      false, "default", 1, null, "myType");
+    MockAM am1 = launchAM(app1, rm1, nm1);
+    // fail the AM by sending CONTAINER_FINISHED event without registering.
+    nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    am1.waitForState(RMAppAttemptState.FAILED);
+    rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
+
+    // a killed app.
+    RMApp app2 = rm1.submitApp(200, "name", "user", null,
+      false, "default", 1, null, "myType");
+    MockAM am2 = launchAM(app2, rm1, nm1);
+    rm1.killApp(app2.getApplicationId());
+    rm1.waitForState(app2.getApplicationId(), RMAppState.KILLED);
+    rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.KILLED);
+
+    // restart rm
+    MockRM rm2 = new MockRM(conf, memStore);
+    rm2.start();
+
+    GetApplicationsRequest request1 =
+        GetApplicationsRequest.newInstance(EnumSet.of(
+          YarnApplicationState.FINISHED, YarnApplicationState.KILLED,
+          YarnApplicationState.FAILED));
+    GetApplicationsResponse response1 =
+        rm2.getClientRMService().getApplications(request1);
+    List<ApplicationReport> appList1 = response1.getApplicationList();
+
+    // assert all applications exist according to application state after RM
+    // restarts.
+    boolean forApp0 = false, forApp1 = false, forApp2 = false;
+    for (ApplicationReport report : appList1) {
+      if (report.getApplicationId().equals(app0.getApplicationId())) {
+        Assert.assertEquals(YarnApplicationState.FINISHED,
+          report.getYarnApplicationState());
+        forApp0 = true;
+      }
+      if (report.getApplicationId().equals(app1.getApplicationId())) {
+        Assert.assertEquals(YarnApplicationState.FAILED,
+          report.getYarnApplicationState());
+        forApp1 = true;
+      }
+      if (report.getApplicationId().equals(app2.getApplicationId())) {
+        Assert.assertEquals(YarnApplicationState.KILLED,
+          report.getYarnApplicationState());
+        forApp2 = true;
+      }
+    }
+    Assert.assertTrue(forApp0 && forApp1 && forApp2);
+
+    // assert all applications exist according to application type after RM
+    // restarts.
+    Set<String> appTypes = new HashSet<String>();
+    appTypes.add("myType");
+    GetApplicationsRequest request2 =
+        GetApplicationsRequest.newInstance(appTypes);
+    GetApplicationsResponse response2 =
+        rm2.getClientRMService().getApplications(request2);
+    List<ApplicationReport> appList2 = response2.getApplicationList();
+    Assert.assertTrue(3 == appList2.size());
+  }
+
+  private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
+      throws Exception {
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    nm.nodeHeartbeat(true);
+    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+    am.registerAppAttempt();
+    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+    return am;
+  }
+
+  private ApplicationReport verifyAppReportAfterRMRestart(RMApp app, MockRM rm)
+      throws Exception {
+    GetApplicationReportRequest reportRequest =
+        GetApplicationReportRequest.newInstance(app.getApplicationId());
+    GetApplicationReportResponse response =
+        rm.getClientRMService().getApplicationReport(reportRequest);
+    ApplicationReport report = response.getApplicationReport();
+    Assert.assertEquals(app.getStartTime(), report.getStartTime());
+    Assert.assertEquals(app.getFinishTime(), report.getFinishTime());
+    Assert.assertEquals(app.createApplicationState(),
+      report.getYarnApplicationState());
+    Assert.assertTrue(1 == report.getProgress());
+    return response.getApplicationReport();
+  }
+
+  private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
+      MockAM am) throws Exception {
+    final FinishApplicationMasterRequest req =
+        FinishApplicationMasterRequest.newInstance(
+          FinalApplicationStatus.SUCCEEDED, "", "");
+    finishApplicationMaster(rmApp, rm, nm, am, req);
+  }
+
+  private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
+      MockAM am, FinishApplicationMasterRequest req) throws Exception {
+    RMState rmState =
+        ((MemoryRMStateStore) rm.getRMContext().getStateStore()).getState();
+    Map<ApplicationId, ApplicationState> rmAppState =
+        rmState.getApplicationState();
+    am.unregisterAppAttempt(req);
+    am.waitForState(RMAppAttemptState.FINISHING);
+    nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    am.waitForState(RMAppAttemptState.FINISHED);
+    rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
+    // check that app/attempt is saved with the final state
+    ApplicationState appState = rmAppState.get(rmApp.getApplicationId());
+    Assert
+      .assertEquals(RMAppState.FINISHED, appState.getState());
+    Assert.assertEquals(RMAppAttemptState.FINISHED,
+      appState.getAttempt(am.getApplicationAttemptId()).getState());
+  }
+
   @Test
   public void testRMRestartOnMaxAppAttempts() throws Exception {
-    Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
         YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
 
@@ -407,16 +728,17 @@ public class TestRMRestart {
         rm2.getRMContext().getRMApps().get(app2.getApplicationId())
         .getMaxAppAttempts());
 
-    // verify that app2 exists  app1 is removed
-    Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
-    Assert.assertNotNull(rm2.getRMContext().getRMApps()
-        .get(app2.getApplicationId()));
-    Assert.assertNull(rm2.getRMContext().getRMApps()
-        .get(app1.getApplicationId()));
-
-    // verify that app2 is stored, app1 is removed
-    Assert.assertNotNull(rmAppState.get(app2.getApplicationId()));
-    Assert.assertNull(rmAppState.get(app1.getApplicationId()));
+    // app1 and app2 are loaded back, but app1 failed because it's
+    // hitting max-retry.
+    Assert.assertEquals(2, rm2.getRMContext().getRMApps().size());
+    rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
+    rm2.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
+
+    // app1 failed state is saved in state store. app2 final saved state is not
+    // determined yet.
+    Assert.assertEquals(RMAppState.FAILED,
+      rmAppState.get(app1.getApplicationId()).getState());
+    Assert.assertNull(rmAppState.get(app2.getApplicationId()).getState());
 
     // stop the RM  
     rm1.stop();

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java?rev=1537560&r1=1537559&r2=1537560&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java Thu Oct 31 17:25:06 2013
@@ -26,10 +26,8 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 
 import javax.crypto.SecretKey;
@@ -39,13 +37,7 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ha.ClientBaseWithFixes;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -54,6 +46,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -66,22 +59,20 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
-import org.apache.zookeeper.ZooKeeper;
-
-import org.junit.Test;
-
 public class RMStateStoreTestBase extends ClientBaseWithFixes{
 
   public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
 
   static class TestDispatcher implements
-      Dispatcher, EventHandler<RMAppAttemptStoredEvent> {
+      Dispatcher, EventHandler<RMAppAttemptNewSavedEvent> {
 
     ApplicationAttemptId attemptId;
     Exception storedException;
@@ -95,7 +86,7 @@ public class RMStateStoreTestBase extend
     }
 
     @Override
-    public void handle(RMAppAttemptStoredEvent event) {
+    public void handle(RMAppAttemptNewSavedEvent event) {
       assertEquals(attemptId, event.getApplicationAttemptId());
       assertEquals(storedException, event.getStoredException());
       notified = true;
@@ -134,18 +125,19 @@ public class RMStateStoreTestBase extend
     dispatcher.notified = false;
   }
 
-  void storeApp(
-      RMStateStore store, ApplicationId appId, long time) throws Exception {
+  void storeApp(RMStateStore store, ApplicationId appId, long submitTime,
+      long startTime) throws Exception {
     ApplicationSubmissionContext context =
         new ApplicationSubmissionContextPBImpl();
     context.setApplicationId(appId);
 
     RMApp mockApp = mock(RMApp.class);
     when(mockApp.getApplicationId()).thenReturn(appId);
-    when(mockApp.getSubmitTime()).thenReturn(time);
+    when(mockApp.getSubmitTime()).thenReturn(submitTime);
+    when(mockApp.getStartTime()).thenReturn(startTime);
     when(mockApp.getApplicationSubmissionContext()).thenReturn(context);
     when(mockApp.getUser()).thenReturn("test");
-    store.storeApplication(mockApp);
+    store.storeNewApplication(mockApp);
   }
 
   ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
@@ -163,7 +155,7 @@ public class RMStateStoreTestBase extend
         .thenReturn(clientTokenMasterKey);
     dispatcher.attemptId = attemptId;
     dispatcher.storedException = null;
-    store.storeApplicationAttempt(mockAttempt);
+    store.storeNewApplicationAttempt(mockAttempt);
     waitNotify(dispatcher);
     return container.getId();
   }
@@ -171,6 +163,7 @@ public class RMStateStoreTestBase extend
   void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
       throws Exception {
     long submitTime = System.currentTimeMillis();
+    long startTime = System.currentTimeMillis() + 1234;
     Configuration conf = new YarnConfiguration();
     RMStateStore store = stateStoreHelper.getRMStateStore();
     TestDispatcher dispatcher = new TestDispatcher();
@@ -184,7 +177,7 @@ public class RMStateStoreTestBase extend
     ApplicationAttemptId attemptId1 = ConverterUtils
         .toApplicationAttemptId("appattempt_1352994193343_0001_000001");
     ApplicationId appId1 = attemptId1.getApplicationId();
-    storeApp(store, appId1, submitTime);
+    storeApp(store, appId1, submitTime, startTime);
 
     // create application token and client token key for attempt1
     Token<AMRMTokenIdentifier> appAttemptToken1 =
@@ -217,7 +210,7 @@ public class RMStateStoreTestBase extend
     ApplicationAttemptId attemptIdRemoved = ConverterUtils
         .toApplicationAttemptId("appattempt_1352994193343_0002_000001");
     ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId();
-    storeApp(store, appIdRemoved, submitTime);
+    storeApp(store, appIdRemoved, submitTime, startTime);
     storeAttempt(store, attemptIdRemoved,
         "container_1352994193343_0002_01_000001", null, null, dispatcher);
 
@@ -241,6 +234,7 @@ public class RMStateStoreTestBase extend
 
     // load state
     store = stateStoreHelper.getRMStateStore();
+    store.setRMDispatcher(dispatcher);
     RMState state = store.loadState();
     Map<ApplicationId, ApplicationState> rmAppState =
         state.getApplicationState();
@@ -250,6 +244,7 @@ public class RMStateStoreTestBase extend
     assertNotNull(appState);
     // app is loaded correctly
     assertEquals(submitTime, appState.getSubmitTime());
+    assertEquals(startTime, appState.getStartTime());
     // submission context is loaded correctly
     assertEquals(appId1,
                  appState.getApplicationSubmissionContext().getApplicationId());
@@ -283,6 +278,59 @@ public class RMStateStoreTestBase extend
         attemptState.getAppAttemptCredentials()
         .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
 
+    //******* update application/attempt state *******//
+    ApplicationState appState2 =
+        new ApplicationState(appState.submitTime, appState.startTime,
+          appState.context, appState.user, RMAppState.FINISHED,
+          "appDiagnostics", 1234);
+    appState2.attempts.putAll(appState.attempts);
+    store.updateApplicationState(appState2);
+
+    ApplicationAttemptState oldAttemptState = attemptState;
+    ApplicationAttemptState newAttemptState =
+        new ApplicationAttemptState(oldAttemptState.getAttemptId(),
+          oldAttemptState.getMasterContainer(),
+          oldAttemptState.getAppAttemptCredentials(),
+          oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
+          "myTrackingUrl", "attemptDiagnostics",
+          FinalApplicationStatus.SUCCEEDED);
+    store.updateApplicationAttemptState(newAttemptState);
+    // let things settle down
+    Thread.sleep(1000);
+    store.close();
+
+    // check updated application state.
+    store = stateStoreHelper.getRMStateStore();
+    store.setRMDispatcher(dispatcher);
+    RMState newRMState = store.loadState();
+    Map<ApplicationId, ApplicationState> newRMAppState =
+        newRMState.getApplicationState();
+    ApplicationState updatedAppState = newRMAppState.get(appId1);
+    assertEquals(appState.getAppId(),updatedAppState.getAppId());
+    assertEquals(appState.getSubmitTime(), updatedAppState.getSubmitTime());
+    assertEquals(appState.getStartTime(), updatedAppState.getStartTime());
+    assertEquals(appState.getUser(), updatedAppState.getUser());
+    // new app state fields
+    assertEquals( RMAppState.FINISHED, updatedAppState.getState());
+    assertEquals("appDiagnostics", updatedAppState.getDiagnostics());
+    assertEquals(1234, updatedAppState.getFinishTime());
+
+    // check updated attempt state
+    ApplicationAttemptState updatedAttemptState =
+        updatedAppState.getAttempt(newAttemptState.getAttemptId());
+    assertEquals(oldAttemptState.getAttemptId(),
+      updatedAttemptState.getAttemptId());
+    assertEquals(containerId2, updatedAttemptState.getMasterContainer().getId());
+    assertArrayEquals(clientTokenKey2.getEncoded(),
+      updatedAttemptState.getAppAttemptCredentials().getSecretKey(
+        RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
+    // new attempt state fields
+    assertEquals(RMAppAttemptState.FINISHED, updatedAttemptState.getState());
+    assertEquals("myTrackingUrl", updatedAttemptState.getFinalTrackingUrl());
+    assertEquals("attemptDiagnostics", updatedAttemptState.getDiagnostics());
+    assertEquals(FinalApplicationStatus.SUCCEEDED,
+      updatedAttemptState.getFinalApplicationStatus());
+
     // assert store is in expected state after everything is cleaned
     assertTrue(stateStoreHelper.isFinalStateValid());
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java?rev=1537560&r1=1537559&r2=1537560&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java Thu Oct 31 17:25:06 2013
@@ -19,19 +19,18 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 import java.util.Collection;
-
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
@@ -226,5 +225,5 @@ public class MockRMApp implements RMApp 
   @Override
   public YarnApplicationState createApplicationState() {
     return null;
-  };
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1537560&r1=1537559&r2=1537560&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Thu Oct 31 17:25:06 2013
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assume.assumeTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -57,13 +58,15 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -286,7 +289,8 @@ public class TestRMAppTransitions {
   }
   
   // test to make sure times are set when app finishes
-  private static void assertTimesAtFinish(RMApp application) {
+  private void assertTimesAtFinish(RMApp application) {
+    sendAppUpdateSavedEvent(application);
     assertStartTimeSet(application);
     Assert.assertTrue("application finish time is not greater then 0",
         (application.getFinishTime() > 0)); 
@@ -294,11 +298,12 @@ public class TestRMAppTransitions {
         (application.getFinishTime() >= application.getStartTime()));
   }
 
-  private void assertAppRemoved(RMApp application){
-    verify(store).removeApplication(application);
+  private void assertAppFinalStateSaved(RMApp application){
+    verify(store, times(1)).updateApplicationState(any(ApplicationState.class));
   }
 
-  private static void assertKilled(RMApp application) {
+  private void assertKilled(RMApp application) {
+    sendAppUpdateSavedEvent(application);
     assertTimesAtFinish(application);
     assertAppState(RMAppState.KILLED, application);
     assertFinalAppStatus(FinalApplicationStatus.KILLED, application);
@@ -307,20 +312,34 @@ public class TestRMAppTransitions {
         "Application killed by user.", diag.toString());
   }
 
-  private static void assertAppAndAttemptKilled(RMApp application) throws InterruptedException {
+  private void assertAppAndAttemptKilled(RMApp application)
+      throws InterruptedException {
     assertKilled(application);
-    Assert.assertEquals( RMAppAttemptState.KILLED, 
-        application.getCurrentAppAttempt().getAppAttemptState() 
-        );
+    // send attempt final state saved event.
+    application.getCurrentAppAttempt().handle(
+      new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt()
+        .getAppAttemptId(), null));
+    Assert.assertEquals(RMAppAttemptState.KILLED, application
+      .getCurrentAppAttempt().getAppAttemptState());
+    assertAppFinalStateSaved(application);
   }
 
-  private static void assertFailed(RMApp application, String regex) {
+  private void assertFailed(RMApp application, String regex) {
+    sendAppUpdateSavedEvent(application);
     assertTimesAtFinish(application);
     assertAppState(RMAppState.FAILED, application);
     assertFinalAppStatus(FinalApplicationStatus.FAILED, application);
     StringBuilder diag = application.getDiagnostics();
     Assert.assertTrue("application diagnostics is not correct",
         diag.toString().matches(regex));
+    assertAppFinalStateSaved(application);
+  }
+
+  private void sendAppUpdateSavedEvent(RMApp application) {
+    RMAppEvent event =
+        new RMAppUpdateSavedEvent(application.getApplicationId(), null);
+    application.handle(event);
+    rmDispatcher.await();
   }
 
   protected RMApp testCreateAppNewSaving(
@@ -340,7 +359,7 @@ public class TestRMAppTransitions {
   RMApp application = testCreateAppNewSaving(submissionContext);
     // NEW_SAVING => SUBMITTED event RMAppEventType.APP_SAVED
     RMAppEvent event =
-        new RMAppStoredEvent(application.getApplicationId(), null);
+        new RMAppNewSavedEvent(application.getApplicationId(), null);
     application.handle(event);
     assertStartTimeSet(application);
     assertAppState(RMAppState.SUBMITTED, application);
@@ -386,15 +405,15 @@ public class TestRMAppTransitions {
     return application;
   }
 
-  protected RMApp testCreateAppRemoving(
+  protected RMApp testCreateAppFinalSaving(
       ApplicationSubmissionContext submissionContext) throws IOException {
     RMApp application = testCreateAppRunning(submissionContext);
     RMAppEvent finishingEvent =
         new RMAppEvent(application.getApplicationId(),
           RMAppEventType.ATTEMPT_UNREGISTERED);
     application.handle(finishingEvent);
-    assertAppState(RMAppState.REMOVING, application);
-    assertAppRemoved(application);
+    assertAppState(RMAppState.FINAL_SAVING, application);
+    assertAppFinalStateSaved(application);
     return application;
   }
 
@@ -402,11 +421,11 @@ public class TestRMAppTransitions {
       ApplicationSubmissionContext submissionContext) throws IOException {
     // unmanaged AMs don't use the FINISHING state
     assert submissionContext == null || !submissionContext.getUnmanagedAM();
-    RMApp application = testCreateAppRemoving(submissionContext);
-    // REMOVING => FINISHING event RMAppEventType.APP_REMOVED
-    RMAppEvent finishingEvent =
-        new RMAppRemovedEvent(application.getApplicationId(), null);
-    application.handle(finishingEvent);
+    RMApp application = testCreateAppFinalSaving(submissionContext);
+    // FINAL_SAVING => FINISHING event RMAppEventType.APP_UPDATED
+    RMAppEvent appUpdated =
+        new RMAppUpdateSavedEvent(application.getApplicationId(), null);
+    application.handle(appUpdated);
     assertAppState(RMAppState.FINISHING, application);
     assertTimesAtFinish(application);
     return application;
@@ -552,7 +571,6 @@ public class TestRMAppTransitions {
         RMAppEventType.KILL);
     application.handle(event);
     rmDispatcher.await();
-    assertKilled(application);
     assertAppAndAttemptKilled(application);
   }
 
@@ -597,7 +615,6 @@ public class TestRMAppTransitions {
         RMAppEventType.KILL);
     application.handle(event);
     rmDispatcher.await();
-    assertKilled(application);
     assertAppAndAttemptKilled(application);
   }
 
@@ -611,6 +628,14 @@ public class TestRMAppTransitions {
         new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
     application.handle(event);
     rmDispatcher.await();
+
+    // Ignore Attempt_Finished if we were supposed to go to Finished.
+    assertAppState(RMAppState.FINAL_SAVING, application);
+    RMAppEvent finishEvent =
+        new RMAppFinishedAttemptEvent(application.getApplicationId(), null);
+    application.handle(finishEvent);
+    assertAppState(RMAppState.FINAL_SAVING, application);
+
     assertKilled(application);
   }
 
@@ -666,40 +691,43 @@ public class TestRMAppTransitions {
   }
 
   @Test
-  public void testAppRemovingFinished() throws IOException {
-    LOG.info("--- START: testAppRemovingFINISHED ---");
-    RMApp application = testCreateAppRemoving(null);
-    // APP_REMOVING => FINISHED event RMAppEventType.ATTEMPT_FINISHED
-    RMAppEvent finishedEvent = new RMAppFinishedAttemptEvent(
-      application.getApplicationId(), null);
-    application.handle(finishedEvent);
-    rmDispatcher.await();
-    assertAppState(RMAppState.FINISHED, application);
-  }
+  public void testAppFinishingKill() throws IOException {
+    LOG.info("--- START: testAppFinishedFinished ---");
 
-  @Test
-  public void testAppRemovingKilled() throws IOException {
-    LOG.info("--- START: testAppRemovingKilledD ---");
-    RMApp application = testCreateAppRemoving(null);
-    // APP_REMOVING => KILLED event RMAppEventType.KILL
+    RMApp application = testCreateAppFinishing(null);
+    // FINISHING => FINISHED event RMAppEventType.KILL
     RMAppEvent event =
         new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
     application.handle(event);
     rmDispatcher.await();
-    assertAppState(RMAppState.KILLED, application);
+    assertAppState(RMAppState.FINISHED, application);
   }
 
+  // While App is at FINAL_SAVING, Attempt_Finished event may come before
+  // App_Saved event, we stay on FINAL_SAVING on Attempt_Finished event
+  // and then directly jump from FINAL_SAVING to FINISHED state on App_Saved
+  // event
   @Test
-  public void testAppFinishingKill() throws IOException {
-    LOG.info("--- START: testAppFinishedFinished ---");
+  public void testAppFinalSavingToFinished() throws IOException {
+    LOG.info("--- START: testAppFinalSavingToFinished ---");
 
-    RMApp application = testCreateAppFinishing(null);
-    // FINISHING => FINISHED event RMAppEventType.KILL
+    RMApp application = testCreateAppFinalSaving(null);
+    final String diagMsg = "some diagnostics";
+    // attempt_finished event comes before attempt_saved event
     RMAppEvent event =
-        new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
+        new RMAppFinishedAttemptEvent(application.getApplicationId(), diagMsg);
     application.handle(event);
-    rmDispatcher.await();
+    assertAppState(RMAppState.FINAL_SAVING, application);
+    RMAppEvent appUpdated =
+        new RMAppUpdateSavedEvent(application.getApplicationId(), null);
+    application.handle(appUpdated);
     assertAppState(RMAppState.FINISHED, application);
+
+    assertTimesAtFinish(application);
+    // finished without a proper unregister implies failed
+    assertFinalAppStatus(FinalApplicationStatus.FAILED, application);
+    Assert.assertTrue("Finished app missing diagnostics", application
+      .getDiagnostics().indexOf(diagMsg) != -1);
   }
 
   @Test
@@ -742,7 +770,7 @@ public class TestRMAppTransitions {
     assertAppState(RMAppState.FAILED, application);
 
     // FAILED => FAILED event RMAppEventType.APP_SAVED
-    event = new RMAppStoredEvent(application.getApplicationId(), null);
+    event = new RMAppNewSavedEvent(application.getApplicationId(), null);
     application.handle(event);
     rmDispatcher.await();
     assertTimesAtFinish(application);
@@ -797,7 +825,7 @@ public class TestRMAppTransitions {
     assertAppState(RMAppState.KILLED, application);
 
     // KILLED => KILLED event RMAppEventType.APP_SAVED
-    event = new RMAppStoredEvent(application.getApplicationId(), null);
+    event = new RMAppNewSavedEvent(application.getApplicationId(), null);
     application.handle(event);
     rmDispatcher.await();
     assertTimesAtFinish(application);
@@ -873,7 +901,7 @@ public class TestRMAppTransitions {
     attempt.handle(new RMAppAttemptContainerAllocatedEvent(attempt
       .getAppAttemptId(), container));
     attempt
-      .handle(new RMAppAttemptStoredEvent(attempt.getAppAttemptId(), null));
+      .handle(new RMAppAttemptNewSavedEvent(attempt.getAppAttemptId(), null));
     attempt.handle(new RMAppAttemptEvent(attempt.getAppAttemptId(),
       RMAppAttemptEventType.LAUNCHED));
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1537560&r1=1537559&r2=1537560&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Thu Oct 31 17:25:06 2013
@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -75,8 +76,9 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -114,7 +116,8 @@ public class TestRMAppAttemptTransitions
   private ApplicationMasterLauncher applicationMasterLauncher;
   private AMLivelinessMonitor amLivelinessMonitor;
   private AMLivelinessMonitor amFinishingMonitor;
-  
+  private RMStateStore store;
+
   private RMApp application;
   private RMAppAttempt applicationAttempt;
 
@@ -209,7 +212,7 @@ public class TestRMAppAttemptTransitions
           new NMTokenSecretManagerInRM(conf),
           clientToAMTokenManager);
     
-    RMStateStore store = mock(RMStateStore.class);
+    store = mock(RMStateStore.class);
     ((RMContextImpl) rmContext).setStateStore(store);
     
     scheduler = mock(YarnScheduler.class);
@@ -330,6 +333,7 @@ public class TestRMAppAttemptTransitions
    * {@link RMAppAttemptState#SUBMITTED} -> {@link RMAppAttemptState#FAILED}
    */
   private void testAppAttemptSubmittedToFailedState(String diagnostics) {
+    sendAttemptUpdateSavedEvent(applicationAttempt);
     assertEquals(RMAppAttemptState.FAILED, 
         applicationAttempt.getAppAttemptState());
     assertEquals(diagnostics, applicationAttempt.getDiagnostics());
@@ -354,6 +358,7 @@ public class TestRMAppAttemptTransitions
    */
   private void testAppAttemptKilledState(Container amContainer, 
       String diagnostics) {
+    sendAttemptUpdateSavedEvent(applicationAttempt);
     assertEquals(RMAppAttemptState.KILLED, 
         applicationAttempt.getAppAttemptState());
     assertEquals(diagnostics, applicationAttempt.getDiagnostics());
@@ -363,6 +368,7 @@ public class TestRMAppAttemptTransitions
     assertEquals(0, applicationAttempt.getRanNodes().size());
     assertNull(applicationAttempt.getFinalApplicationStatus());
     verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
+    verifyAttemptFinalStateSaved();
   }
   
   /**
@@ -427,6 +433,7 @@ public class TestRMAppAttemptTransitions
    */
   private void testAppAttemptFailedState(Container container, 
       String diagnostics) {
+    sendAttemptUpdateSavedEvent(applicationAttempt);
     assertEquals(RMAppAttemptState.FAILED, 
         applicationAttempt.getAppAttemptState());
     assertEquals(diagnostics, applicationAttempt.getDiagnostics());
@@ -437,8 +444,8 @@ public class TestRMAppAttemptTransitions
     
     // Check events
     verify(application, times(2)).handle(any(RMAppFailedAttemptEvent.class));
-
     verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
+    verifyAttemptFinalStateSaved();
   }
 
   /**
@@ -492,6 +499,7 @@ public class TestRMAppAttemptTransitions
     assertEquals(container, applicationAttempt.getMasterContainer());
     assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
     verifyTokenCount(applicationAttempt.getAppAttemptId(), 0);
+    verifyAttemptFinalStateSaved();
   }
 
   /**
@@ -507,11 +515,11 @@ public class TestRMAppAttemptTransitions
     assertEquals(diagnostics, applicationAttempt.getDiagnostics());
     verifyUrl(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
     if (unmanagedAM) {
-      verifyUrl(trackingUrl, applicationAttempt.getTrackingUrl());
-      
+      verifyUrl(trackingUrl, applicationAttempt.getTrackingUrl()); 
     } else {
       assertEquals(getProxyUrl(applicationAttempt),
           applicationAttempt.getTrackingUrl());
+      verifyAttemptFinalStateSaved();
     }
     assertEquals(finishedContainerCount, applicationAttempt
         .getJustFinishedContainers().size());
@@ -539,7 +547,7 @@ public class TestRMAppAttemptTransitions
       assertEquals(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
           applicationAttempt.getAppAttemptState());
       applicationAttempt.handle(
-          new RMAppAttemptStoredEvent(
+          new RMAppAttemptNewSavedEvent(
               applicationAttempt.getAppAttemptId(), null));
     }
     
@@ -576,7 +584,7 @@ public class TestRMAppAttemptTransitions
     assertEquals(RMAppAttemptState.ALLOCATED_SAVING, 
         applicationAttempt.getAppAttemptState());
     applicationAttempt.handle(
-        new RMAppAttemptStoredEvent(
+        new RMAppAttemptNewSavedEvent(
             applicationAttempt.getAppAttemptId(), null));
     
     testAppAttemptAllocatedState(container);
@@ -617,6 +625,7 @@ public class TestRMAppAttemptTransitions
         new RMAppAttemptUnregistrationEvent(
             applicationAttempt.getAppAttemptId(),
             trackingUrl, finalStatus, diagnostics));
+    sendAttemptUpdateSavedEvent(applicationAttempt);
     testAppAttemptFinishingState(container, finalStatus,
         trackingUrl, diagnostics);
   }
@@ -647,7 +656,15 @@ public class TestRMAppAttemptTransitions
     testAppAttemptFinishedState(null, finalStatus, url, diagnostics, 1,
         true);
   }
-  
+
+  private void sendAttemptUpdateSavedEvent(RMAppAttempt applicationAttempt) {
+    assertEquals(RMAppAttemptState.FINAL_SAVING,
+      applicationAttempt.getAppAttemptState());
+    applicationAttempt.handle(
+      new RMAppAttemptUpdateSavedEvent(
+          applicationAttempt.getAppAttemptId(), null));
+  }
+
   @Test
   public void testUnmanagedAMUnexpectedRegistration() {
     unmanagedAM = true;
@@ -745,6 +762,7 @@ public class TestRMAppAttemptTransitions
           ContainerState.COMPLETE, containerDiagMsg, exitCode);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
       applicationAttempt.getAppAttemptId(), cs));
+    sendAttemptUpdateSavedEvent(applicationAttempt);
     assertEquals(RMAppAttemptState.FAILED,
       applicationAttempt.getAppAttemptState());
     verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
@@ -762,6 +780,20 @@ public class TestRMAppAttemptTransitions
     ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
         appAttemptId, cs));
+
+    // ignored ContainerFinished and Expire at FinalSaving if we were supposed
+    // to Failed state.
+    assertEquals(RMAppAttemptState.FINAL_SAVING,
+      applicationAttempt.getAppAttemptState()); 
+    applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
+      applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
+        amContainer.getId(), ContainerState.COMPLETE, "", 0)));
+    applicationAttempt.handle(new RMAppAttemptEvent(
+      applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE));
+    assertEquals(RMAppAttemptState.FINAL_SAVING,
+      applicationAttempt.getAppAttemptState()); 
+
+    sendAttemptUpdateSavedEvent(applicationAttempt);
     assertEquals(RMAppAttemptState.FAILED,
         applicationAttempt.getAppAttemptState());
     assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
@@ -782,6 +814,20 @@ public class TestRMAppAttemptTransitions
         new RMAppAttemptEvent(
             applicationAttempt.getAppAttemptId(),
             RMAppAttemptEventType.KILL));
+
+    // ignored ContainerFinished and Expire at FinalSaving if we were supposed
+    // to Killed state.
+    assertEquals(RMAppAttemptState.FINAL_SAVING,
+      applicationAttempt.getAppAttemptState()); 
+    applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
+      applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
+        amContainer.getId(), ContainerState.COMPLETE, "", 0)));
+    applicationAttempt.handle(new RMAppAttemptEvent(
+      applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE));
+    assertEquals(RMAppAttemptState.FINAL_SAVING,
+      applicationAttempt.getAppAttemptState()); 
+
+    sendAttemptUpdateSavedEvent(applicationAttempt);
     assertEquals(RMAppAttemptState.KILLED,
         applicationAttempt.getAppAttemptState());
     assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
@@ -800,6 +846,7 @@ public class TestRMAppAttemptTransitions
     launchApplicationAttempt(amContainer);
     applicationAttempt.handle(new RMAppAttemptEvent(
         applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE));
+    sendAttemptUpdateSavedEvent(applicationAttempt);
     assertEquals(RMAppAttemptState.FAILED,
         applicationAttempt.getAppAttemptState());
     assertTrue("expire diagnostics missing",
@@ -818,6 +865,7 @@ public class TestRMAppAttemptTransitions
     runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
     applicationAttempt.handle(new RMAppAttemptEvent(
         applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE));
+    sendAttemptUpdateSavedEvent(applicationAttempt);
     assertEquals(RMAppAttemptState.FAILED,
         applicationAttempt.getAppAttemptState());
     assertTrue("expire diagnostics missing",
@@ -962,7 +1010,64 @@ public class TestRMAppAttemptTransitions
     testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
         diagnostics, 0, false);
   }
-  
+
+  // While attempt is at FINAL_SAVING, Contaienr_Finished event may come before
+  // Attempt_Saved event, we stay on FINAL_SAVING on Container_Finished event
+  // and then directly jump from FINAL_SAVING to FINISHED state on Attempt_Saved
+  // event
+  @Test
+  public void
+      testFinalSavingToFinishedWithContainerFinished() {
+    Container amContainer = allocateApplicationAttempt();
+    launchApplicationAttempt(amContainer);
+    runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
+    FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
+    String trackingUrl = "mytrackingurl";
+    String diagnostics = "Successful";
+    applicationAttempt.handle(new RMAppAttemptUnregistrationEvent(
+      applicationAttempt.getAppAttemptId(), trackingUrl, finalStatus,
+      diagnostics));
+    assertEquals(RMAppAttemptState.FINAL_SAVING,
+      applicationAttempt.getAppAttemptState());
+    // Container_finished event comes before Attempt_Saved event.
+    applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
+      applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
+        amContainer.getId(), ContainerState.COMPLETE, "", 0)));
+    assertEquals(RMAppAttemptState.FINAL_SAVING,
+      applicationAttempt.getAppAttemptState());
+    // send attempt_saved
+    sendAttemptUpdateSavedEvent(applicationAttempt);
+    testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
+      diagnostics, 0, false);
+  }
+
+  // While attempt is at FINAL_SAVING, Expire event may come before
+  // Attempt_Saved event, we stay on FINAL_SAVING on Expire event and then
+  // directly jump from FINAL_SAVING to FINISHED state on Attempt_Saved event.
+  @Test
+  public void testFinalSavingToFinishedWithExpire() {
+    Container amContainer = allocateApplicationAttempt();
+    launchApplicationAttempt(amContainer);
+    runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
+    FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
+    String trackingUrl = "mytrackingurl";
+    String diagnostics = "Successssseeeful";
+    applicationAttempt.handle(new RMAppAttemptUnregistrationEvent(
+      applicationAttempt.getAppAttemptId(), trackingUrl, finalStatus,
+      diagnostics));
+    assertEquals(RMAppAttemptState.FINAL_SAVING,
+      applicationAttempt.getAppAttemptState());
+    // Expire event comes before Attempt_saved event.
+    applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt
+      .getAppAttemptId(), RMAppAttemptEventType.EXPIRE));
+    assertEquals(RMAppAttemptState.FINAL_SAVING,
+      applicationAttempt.getAppAttemptState());
+    // send attempt_saved
+    sendAttemptUpdateSavedEvent(applicationAttempt);
+    testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
+      diagnostics, 0, false);
+  }
+
   private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {
     verify(amRMTokenManager, times(count)).applicationMasterFinished(appAttemptId);
     if (UserGroupInformation.isSecurityEnabled()) {
@@ -980,4 +1085,9 @@ public class TestRMAppAttemptTransitions
       assertEquals(url1, url2);
     }
   }
+
+  private void verifyAttemptFinalStateSaved() {
+    verify(store, times(1)).updateApplicationAttemptState(
+      any(ApplicationAttemptState.class));
+  }
 }



Mime
View raw message