hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1537560 [2/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/proto/server/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanage...
Date Thu, 31 Oct 2013 17:25:07 GMT
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1537560&r1=1537559&r2=1537560&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Thu Oct 31 17:25:06 2013
@@ -54,10 +54,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -103,7 +101,8 @@ public class RMAppImpl implements RMApp,
 
   // Mutable fields
   private long startTime;
-  private long finishTime;
+  private long finishTime = 0;
+  private long storedFinishTime = 0;
   private RMAppAttempt currentAttempt;
   private String queue;
   @SuppressWarnings("rawtypes")
@@ -111,8 +110,11 @@ public class RMAppImpl implements RMApp,
   private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
   private static final AppFinishedTransition FINISHED_TRANSITION =
       new AppFinishedTransition();
-  private boolean isAppRemovalRequestSent = false;
-  private RMAppState previousStateAtRemoving;
+  private RMAppState stateBeforeFinalSaving;
+  private RMAppEvent eventCausingFinalSaving;
+  private RMAppState targetedFinalState;
+  private RMAppState recoveredFinalState;
+  Object transitionTodo;
 
   private static final StateMachineFactory<RMAppImpl,
                                            RMAppState,
@@ -129,32 +131,45 @@ public class RMAppImpl implements RMApp,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
         RMAppEventType.START, new RMAppSavingTransition())
-    .addTransition(RMAppState.NEW, RMAppState.SUBMITTED,
-        RMAppEventType.RECOVER, new StartAppAttemptTransition())
-    .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
-        new AppKilledTransition())
-    .addTransition(RMAppState.NEW, RMAppState.FAILED,
-        RMAppEventType.APP_REJECTED, new AppRejectedTransition())
+    .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
+            RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED,
+            RMAppState.FINAL_SAVING),
+        RMAppEventType.RECOVER, new RMAppRecoveredTransition())
+    .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.KILL,
+        new FinalSavingTransition(
+          new AppKilledTransition(), RMAppState.KILLED))
+    .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
+        RMAppEventType.APP_REJECTED,
+        new FinalSavingTransition(
+          new AppRejectedTransition(), RMAppState.FAILED))
 
     // Transitions from NEW_SAVING state
     .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
-        RMAppEventType.APP_SAVED, new StartAppAttemptTransition())
-    .addTransition(RMAppState.NEW_SAVING, RMAppState.KILLED,
-        RMAppEventType.KILL, new AppKilledTransition())
-    .addTransition(RMAppState.NEW_SAVING, RMAppState.FAILED,
-        RMAppEventType.APP_REJECTED, new AppRejectedTransition())
+        RMAppEventType.APP_NEW_SAVED, new StartAppAttemptTransition())
+    .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
+        RMAppEventType.KILL,
+        new FinalSavingTransition(
+          new AppKilledTransition(), RMAppState.KILLED))
+    .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
+        RMAppEventType.APP_REJECTED,
+          new FinalSavingTransition(new AppRejectedTransition(),
+            RMAppState.FAILED))
 
      // Transitions from SUBMITTED state
     .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
-    .addTransition(RMAppState.SUBMITTED, RMAppState.FAILED,
-        RMAppEventType.APP_REJECTED, new AppRejectedTransition())
+    .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
+        RMAppEventType.APP_REJECTED,
+        new FinalSavingTransition(
+          new AppRejectedTransition(), RMAppState.FAILED))
     .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
         RMAppEventType.APP_ACCEPTED)
-    .addTransition(RMAppState.SUBMITTED, RMAppState.KILLED,
-        RMAppEventType.KILL, new KillAppAndAttemptTransition())
+    .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
+        RMAppEventType.KILL,
+        new FinalSavingTransition(
+          new KillAppAndAttemptTransition(), RMAppState.KILLED))
 
      // Transitions from ACCEPTED state
     .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
@@ -162,37 +177,45 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
         RMAppEventType.ATTEMPT_REGISTERED)
     .addTransition(RMAppState.ACCEPTED,
-        EnumSet.of(RMAppState.SUBMITTED, RMAppState.FAILED),
+        EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
         RMAppEventType.ATTEMPT_FAILED,
         new AttemptFailedTransition(RMAppState.SUBMITTED))
-    .addTransition(RMAppState.ACCEPTED, RMAppState.KILLED,
-        RMAppEventType.KILL, new KillAppAndAttemptTransition())
+    .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
+        RMAppEventType.KILL,
+        new FinalSavingTransition(
+          new KillAppAndAttemptTransition(), RMAppState.KILLED))
 
      // Transitions from RUNNING state
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
-    .addTransition(RMAppState.RUNNING, RMAppState.REMOVING,
-          RMAppEventType.ATTEMPT_UNREGISTERED,
-        new RMAppRemovingTransition())
+    .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
+        RMAppEventType.ATTEMPT_UNREGISTERED,
+        new FinalSavingTransition(
+          new AttemptUnregisteredTransition(),
+          RMAppState.FINISHING, RMAppState.FINISHED))
     .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
+      // UnManagedAM directly jumps to finished
         RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
     .addTransition(RMAppState.RUNNING,
-        EnumSet.of(RMAppState.SUBMITTED, RMAppState.FAILED),
+        EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
         RMAppEventType.ATTEMPT_FAILED,
         new AttemptFailedTransition(RMAppState.SUBMITTED))
-    .addTransition(RMAppState.RUNNING, RMAppState.KILLED,
-        RMAppEventType.KILL, new KillAppAndAttemptTransition())
-
-     // Transitions from REMOVING state
-    .addTransition(RMAppState.REMOVING, RMAppState.FINISHING,
-        RMAppEventType.APP_REMOVED,  new RMAppFinishingTransition())
-    .addTransition(RMAppState.REMOVING, RMAppState.FINISHED,
-        RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
-    .addTransition(RMAppState.REMOVING, RMAppState.KILLED,
-        RMAppEventType.KILL, new KillAppAndAttemptTransition())
+    .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
+        RMAppEventType.KILL,
+        new FinalSavingTransition(
+          new KillAppAndAttemptTransition(), RMAppState.KILLED))
+
+     // Transitions from FINAL_SAVING state
+    .addTransition(RMAppState.FINAL_SAVING,
+      EnumSet.of(RMAppState.FINISHING, RMAppState.FAILED,
+        RMAppState.KILLED, RMAppState.FINISHED), RMAppEventType.APP_UPDATE_SAVED,
+        new FinalStateSavedTransition())
+    .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
+        RMAppEventType.ATTEMPT_FINISHED,
+        new AttemptFinishedAtFinalSavingTransition())
     // ignorable transitions
-    .addTransition(RMAppState.REMOVING, RMAppState.REMOVING,
-        RMAppEventType.NODE_UPDATE)
+    .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
+        EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL))
 
      // Transitions from FINISHING state
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
@@ -201,7 +224,7 @@ public class RMAppImpl implements RMApp,
         RMAppEventType.KILL, new KillAppAndAttemptTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
-      EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.APP_REMOVED))
+      EnumSet.of(RMAppEventType.NODE_UPDATE))
 
      // Transitions from FINISHED state
      // ignorable transitions
@@ -210,14 +233,12 @@ public class RMAppImpl implements RMApp,
             RMAppEventType.NODE_UPDATE,
             RMAppEventType.ATTEMPT_UNREGISTERED,
             RMAppEventType.ATTEMPT_FINISHED,
-            RMAppEventType.KILL,
-            RMAppEventType.APP_REMOVED))
+            RMAppEventType.KILL))
 
      // Transitions from FAILED state
      // ignorable transitions
     .addTransition(RMAppState.FAILED, RMAppState.FAILED,
-        EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE,
-          RMAppEventType.APP_SAVED, RMAppEventType.APP_REMOVED))
+        EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))
 
      // Transitions from KILLED state
      // ignorable transitions
@@ -227,8 +248,7 @@ public class RMAppImpl implements RMApp,
         EnumSet.of(RMAppEventType.APP_ACCEPTED,
             RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
             RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
-            RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE,
-            RMAppEventType.APP_SAVED, RMAppEventType.APP_REMOVED))
+            RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE))
 
      .installTopology();
 
@@ -316,9 +336,8 @@ public class RMAppImpl implements RMApp,
   @Override
   public RMAppState getState() {
     this.readLock.lock();
-
     try {
-      return this.stateMachine.getCurrentState();
+        return this.stateMachine.getCurrentState();
     } finally {
       this.readLock.unlock();
     }
@@ -398,7 +417,7 @@ public class RMAppImpl implements RMApp,
     case SUBMITTED:
     case ACCEPTED:
     case RUNNING:
-    case REMOVING:
+    case FINAL_SAVING:
       return FinalApplicationStatus.UNDEFINED;    
     // finished without a proper final state is the same as failed  
     case FINISHING:
@@ -586,8 +605,12 @@ public class RMAppImpl implements RMApp,
   @Override
   public void recover(RMState state) throws Exception{
     ApplicationState appState = state.getApplicationState().get(getApplicationId());
+    this.recoveredFinalState = appState.getState();
     LOG.info("Recovering app: " + getApplicationId() + " with " + 
-            + appState.getAttemptCount() + " attempts");
+        + appState.getAttemptCount() + " attempts and final state = " + this.recoveredFinalState );
+    this.diagnostics.append(appState.getDiagnostics());
+    this.storedFinishTime = appState.getFinishTime();
+    this.startTime = appState.getStartTime();
     for(int i=0; i<appState.getAttemptCount(); ++i) {
       // create attempt
       createNewAttempt(false);
@@ -632,60 +655,195 @@ public class RMAppImpl implements RMApp,
           nodeUpdateEvent.getNode());
     };
   }
-  
+
+  private static final class RMAppRecoveredTransition implements
+      MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
+    @Override
+    public RMAppState transition(RMAppImpl app, RMAppEvent event) {
+
+      if (app.recoveredFinalState != null) {
+        FINAL_TRANSITION.transition(app, event);
+        return app.recoveredFinalState;
+      }
+      // Directly call AttemptFailedTransition, since now we deem that an
+      // application fails because of RM restart as a normal AM failure.
+
+      // Do not recover unmanaged applications since current recovery 
+      // mechanism of restarting attempts does not work for them.
+      // This will need to be changed in work preserving recovery in which 
+      // RM will re-connect with the running AM's instead of restarting them
+
+      // In work-preserve restart, if attemptCount == maxAttempts, the job still
+      // needs to be recovered because the last attempt may still be running.
+
+      // As part of YARN-1210, we may return ACCECPTED state waiting for AM to
+      // reregister or fail and remove the following code.
+      return new AttemptFailedTransition(RMAppState.SUBMITTED).transition(app,
+        event);
+    }
+  }
+
   private static final class StartAppAttemptTransition extends RMAppTransition {
+    @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      if (event.getType().equals(RMAppEventType.APP_SAVED)) {
-        assert app.getState().equals(RMAppState.NEW_SAVING);
-        RMAppStoredEvent storeEvent = (RMAppStoredEvent) event;
-        if(storeEvent.getStoredException() != null) {
-          // For HA this exception needs to be handled by giving up
-          // master status if we got fenced
-          LOG.error("Failed to store application: "
-              + storeEvent.getApplicationId(),
-              storeEvent.getStoredException());
-          ExitUtil.terminate(1, storeEvent.getStoredException());
-        }
+      RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event;
+      if (storeEvent.getStoredException() != null) {
+        // For HA this exception needs to be handled by giving up
+        // master status if we got fenced
+        LOG.error(
+          "Failed to store application: " + storeEvent.getApplicationId(),
+          storeEvent.getStoredException());
+        ExitUtil.terminate(1, storeEvent.getStoredException());
       }
-
       app.createNewAttempt(true);
     };
   }
 
-  private static final class RMAppFinishingTransition extends RMAppTransition {
+  private static final class FinalStateSavedTransition implements
+      MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public RMAppState transition(RMAppImpl app, RMAppEvent event) {
+      RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event;
+      if (storeEvent.getUpdatedException() != null) {
+        LOG.error("Failed to update the final state of application"
+              + storeEvent.getApplicationId(), storeEvent.getUpdatedException());
+        ExitUtil.terminate(1, storeEvent.getUpdatedException());
+      }
+
+      if (app.transitionTodo instanceof SingleArcTransition) {
+        ((SingleArcTransition) app.transitionTodo).transition(app,
+          app.eventCausingFinalSaving);
+      } else if (app.transitionTodo instanceof MultipleArcTransition) {
+        ((MultipleArcTransition) app.transitionTodo).transition(app,
+          app.eventCausingFinalSaving);
+      }
+      return app.targetedFinalState;
+
+    }
+  }
+
+  private static class AttemptFailedFinalStateSavedTransition extends
+      RMAppTransition {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      if (event.getType().equals(RMAppEventType.APP_REMOVED)) {
-        RMAppRemovedEvent removeEvent = (RMAppRemovedEvent) event;
-        if (removeEvent.getRemovedException() != null) {
-          LOG.error(
-            "Failed to remove application: " + removeEvent.getApplicationId(),
-            removeEvent.getRemovedException());
-          ExitUtil.terminate(1, removeEvent.getRemovedException());
-        }
+      String msg = null;
+      if (event instanceof RMAppFailedAttemptEvent) {
+        msg = app.getAppAttemptFailedDiagnostics(event);
       }
-      app.finishTime = System.currentTimeMillis();
+      LOG.info(msg);
+      app.diagnostics.append(msg);
+      // Inform the node for app-finish
+      FINAL_TRANSITION.transition(app, event);
+    }
+  }
+
+  private String getAppAttemptFailedDiagnostics(RMAppEvent event) {
+    String msg = null;
+    RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
+    if (this.submissionContext.getUnmanagedAM()) {
+      // RM does not manage the AM. Do not retry
+      msg = "Unmanaged application " + this.getApplicationId()
+              + " failed due to " + failedEvent.getDiagnostics()
+              + ". Failing the application.";
+    } else if (this.attempts.size() >= this.maxAppAttempts) {
+      msg = "Application " + this.getApplicationId() + " failed "
+              + this.maxAppAttempts + " times due to "
+              + failedEvent.getDiagnostics() + ". Failing the application.";
     }
+    return msg;
   }
 
   private static final class RMAppSavingTransition extends RMAppTransition {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
+
       // If recovery is enabled then store the application information in a
       // non-blocking call so make sure that RM has stored the information
       // needed to restart the AM after RM restart without further client
       // communication
       LOG.info("Storing application with id " + app.applicationId);
-      app.rmContext.getStateStore().storeApplication(app);
+      app.rmContext.getStateStore().storeNewApplication(app);
     }
   }
 
-  private static final class RMAppRemovingTransition extends RMAppTransition {
+  private void rememberTargetTransitions(RMAppEvent event,
+      Object transitionToDo, RMAppState targetFinalState) {
+    transitionTodo = transitionToDo;
+    targetedFinalState = targetFinalState;
+    eventCausingFinalSaving = event;
+  }
+
+  private void rememberTargetTransitionsAndStoreState(RMAppEvent event,
+      Object transitionToDo, RMAppState targetFinalState,
+      RMAppState stateToBeStored) {
+    rememberTargetTransitions(event, transitionToDo, targetFinalState);
+    this.stateBeforeFinalSaving = getState();
+    this.storedFinishTime = System.currentTimeMillis();
+
+    LOG.info("Updating application " + this.applicationId
+        + " with final state: " + this.targetedFinalState);
+    // we lost attempt_finished diagnostics in app, because attempt_finished
+    // diagnostics is sent after app final state is saved. Later on, we will
+    // create GetApplicationAttemptReport specifically for getting per attempt
+    // info.
+    String diags = null;
+    switch (event.getType()) {
+    case APP_REJECTED:
+      RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent) event;
+      diags = rejectedEvent.getMessage();
+      break;
+    case ATTEMPT_FINISHED:
+      RMAppFinishedAttemptEvent finishedEvent =
+          (RMAppFinishedAttemptEvent) event;
+      diags = finishedEvent.getDiagnostics();
+      break;
+    case ATTEMPT_FAILED:
+      RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
+      diags = getAppAttemptFailedDiagnostics(failedEvent);
+      break;
+    case KILL:
+      diags = getAppKilledDiagnostics();
+      break;
+    default:
+      break;
+    }
+    ApplicationState appState =
+        new ApplicationState(this.submitTime, this.startTime,
+          this.submissionContext, this.user, stateToBeStored, diags,
+          this.storedFinishTime);
+    this.rmContext.getStateStore().updateApplicationState(appState);
+  }
+
+  private static final class FinalSavingTransition extends RMAppTransition {
+    Object transitionToDo;
+    RMAppState targetedFinalState;
+    RMAppState stateToBeStored;
+
+    public FinalSavingTransition(Object transitionToDo,
+        RMAppState targetedFinalState) {
+      this(transitionToDo, targetedFinalState, targetedFinalState);
+    }
+
+    public FinalSavingTransition(Object transitionToDo,
+        RMAppState targetedFinalState, RMAppState stateToBeStored) {
+      this.transitionToDo = transitionToDo;
+      this.targetedFinalState = targetedFinalState;
+      this.stateToBeStored = stateToBeStored;
+    }
+
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      LOG.info("Removing application with id " + app.applicationId);
-      app.removeApplicationState();
-      app.previousStateAtRemoving = app.getState();
+      app.rememberTargetTransitionsAndStoreState(event, transitionToDo,
+        targetedFinalState, stateToBeStored);
+    }
+  }
+
+  private static class AttemptUnregisteredTransition extends RMAppTransition {
+    @Override
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      app.finishTime = app.storedFinishTime;
     }
   }
 
@@ -698,6 +856,40 @@ public class RMAppImpl implements RMApp,
     };
   }
 
+  private static class AttemptFinishedAtFinalSavingTransition extends
+      RMAppTransition {
+    @Override
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      if (app.targetedFinalState.equals(RMAppState.FAILED)
+          || app.targetedFinalState.equals(RMAppState.KILLED)) {
+        // Ignore Attempt_Finished event if we were supposed to reach FAILED
+        // FINISHED state
+        return;
+      }
+
+      // pass in the earlier attempt_unregistered event, as it is needed in
+      // AppFinishedFinalStateSavedTransition later on
+      app.rememberTargetTransitions(event,
+        new AppFinishedFinalStateSavedTransition(app.eventCausingFinalSaving),
+        RMAppState.FINISHED);
+    };
+  }
+
+  private static class AppFinishedFinalStateSavedTransition extends
+      RMAppTransition {
+    RMAppEvent attemptUnregistered;
+
+    public AppFinishedFinalStateSavedTransition(RMAppEvent attemptUnregistered) {
+      this.attemptUnregistered = attemptUnregistered;
+    }
+    @Override
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      new AttemptUnregisteredTransition().transition(app, attemptUnregistered);
+      FINISHED_TRANSITION.transition(app, event);
+    };
+  }
+
+
   private static class AppKilledTransition extends FinalTransition {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
@@ -706,6 +898,10 @@ public class RMAppImpl implements RMApp,
     };
   }
 
+  private static String getAppKilledDiagnostics() {
+    return "Application killed by user.";
+  }
+
   private static class KillAppAndAttemptTransition extends AppKilledTransition {
     @SuppressWarnings("unchecked")
     @Override
@@ -741,12 +937,10 @@ public class RMAppImpl implements RMApp,
         app.handler.handle(
             new RMNodeCleanAppEvent(nodeId, app.applicationId));
       }
-      if (app.getState() != RMAppState.FINISHING) {
+      app.finishTime = app.storedFinishTime;
+      if (app.finishTime == 0 ) {
         app.finishTime = System.currentTimeMillis();
       }
-      // application completely done and remove from state store.
-      app.removeApplicationState();
-
       app.handler.handle(
           new RMAppManagerEvent(app.applicationId,
           RMAppManagerEventType.APP_COMPLETED));
@@ -764,32 +958,15 @@ public class RMAppImpl implements RMApp,
 
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
-
-      RMAppFailedAttemptEvent failedEvent = ((RMAppFailedAttemptEvent) event);
-      boolean retryApp = true;
-      String msg = null;
-      if (app.submissionContext.getUnmanagedAM()) {
-        // RM does not manage the AM. Do not retry
-        retryApp = false;
-        msg = "Unmanaged application " + app.getApplicationId()
-            + " failed due to " + failedEvent.getDiagnostics()
-            + ". Failing the application.";
-      } else if (app.attempts.size() >= app.maxAppAttempts) {
-        retryApp = false;
-        msg = "Application " + app.getApplicationId() + " failed "
-            + app.maxAppAttempts + " times due to " + failedEvent.getDiagnostics()
-            + ". Failing the application.";
-      }
-
-      if (retryApp) {
+      if (!app.submissionContext.getUnmanagedAM()
+          && app.attempts.size() < app.maxAppAttempts) {
         app.createNewAttempt(true);
         return initialState;
       } else {
-        LOG.info(msg);
-        app.diagnostics.append(msg);
-        // Inform the node for app-finish
-        FINAL_TRANSITION.transition(app, event);
-        return RMAppState.FAILED;
+        app.rememberTargetTransitionsAndStoreState(event,
+          new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED,
+          RMAppState.FAILED);
+        return RMAppState.FINAL_SAVING;
       }
     }
 
@@ -814,9 +991,9 @@ public class RMAppImpl implements RMApp,
   @Override
   public YarnApplicationState createApplicationState() {
     RMAppState rmAppState = getState();
-    // If App is in REMOVING state, return its previous state.
-    if (rmAppState.equals(RMAppState.REMOVING)) {
-      rmAppState = previousStateAtRemoving;
+    // If App is in FINAL_SAVING state, return its previous state.
+    if (rmAppState.equals(RMAppState.FINAL_SAVING)) {
+      rmAppState = stateBeforeFinalSaving;
     }
     switch (rmAppState) {
     case NEW:
@@ -840,11 +1017,4 @@ public class RMAppImpl implements RMApp,
       throw new YarnRuntimeException("Unknown state passed!");
     }
   }
-
-  private void removeApplicationState(){
-    if (!isAppRemovalRequestSent) {
-      rmContext.getStateStore().removeApplication(this);
-      isAppRemovalRequestSent = true;
-    }
-  }
 }

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNewSavedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNewSavedEvent.java?rev=1537560&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNewSavedEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNewSavedEvent.java Thu Oct 31 17:25:06 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class RMAppNewSavedEvent extends RMAppEvent {
+
+  private final Exception storedException;
+
+  public RMAppNewSavedEvent(ApplicationId appId, Exception storedException) {
+    super(appId, RMAppEventType.APP_NEW_SAVED);
+    this.storedException = storedException;
+  }
+
+  public Exception getStoredException() {
+    return storedException;
+  }
+
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java?rev=1537560&r1=1537559&r2=1537560&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java Thu Oct 31 17:25:06 2013
@@ -24,7 +24,7 @@ public enum RMAppState {
   SUBMITTED,
   ACCEPTED,
   RUNNING,
-  REMOVING,
+  FINAL_SAVING,
   FINISHING,
   FINISHED,
   FAILED,

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppUpdateSavedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppUpdateSavedEvent.java?rev=1537560&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppUpdateSavedEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppUpdateSavedEvent.java Thu Oct 31 17:25:06 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class RMAppUpdateSavedEvent extends RMAppEvent {
+
+  private final Exception updatedException;
+
+  public RMAppUpdateSavedEvent(ApplicationId appId, Exception updatedException) {
+    super(appId, RMAppEventType.APP_UPDATE_SAVED);
+    this.updatedException = updatedException;
+  }
+
+  public Exception getUpdatedException() {
+    return updatedException;
+  }
+
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java?rev=1537560&r1=1537559&r2=1537560&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java Thu Oct 31 17:25:06 2013
@@ -41,7 +41,8 @@ public enum RMAppAttemptEventType {
   CONTAINER_FINISHED,
   
   // Source: RMStateStore
-  ATTEMPT_SAVED,
+  ATTEMPT_NEW_SAVED,
+  ATTEMPT_UPDATE_SAVED,
 
   // Source: Scheduler
   APP_REJECTED,

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1537560&r1=1537559&r2=1537560&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Thu Oct 31 17:25:06 2013
@@ -40,7 +40,6 @@ import org.apache.commons.lang.StringUti
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -82,8 +81,9 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
@@ -142,7 +142,7 @@ public class RMAppAttemptImpl implements
   private float progress = 0;
   private String host = "N/A";
   private int rpcPort;
-  private String origTrackingUrl = "N/A";
+  private String originalTrackingUrl = "N/A";
   private String proxiedTrackingUrl = "N/A";
   private long startTime = 0;
 
@@ -157,6 +157,11 @@ public class RMAppAttemptImpl implements
   private static final ExpiredTransition EXPIRED_TRANSITION =
       new ExpiredTransition();
 
+  private RMAppAttemptEvent eventCausingFinalSaving;
+  private RMAppAttemptState targetedFinalState;
+  private RMAppAttemptState recoveredFinalState;
+  private Object transitionTodo;
+
   private static final StateMachineFactory<RMAppAttemptImpl,
                                            RMAppAttemptState,
                                            RMAppAttemptEventType,
@@ -169,68 +174,80 @@ public class RMAppAttemptImpl implements
        // Transitions from NEW State
       .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
           RMAppAttemptEventType.START, new AttemptStartedTransition())
-      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.KILLED,
+      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.KILL,
-          new BaseFinalTransition(RMAppAttemptState.KILLED))
-      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FAILED,
+          new FinalSavingTransition(new BaseFinalTransition(
+            RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
+      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.REGISTERED,
-          new UnexpectedAMRegisteredTransition())
-      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.RECOVERED, 
-          RMAppAttemptEventType.RECOVER)
+          new FinalSavingTransition(
+            new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
+      .addTransition( RMAppAttemptState.NEW,
+          EnumSet.of(RMAppAttemptState.FINISHED, RMAppAttemptState.KILLED,
+            RMAppAttemptState.FAILED, RMAppAttemptState.RECOVERED),
+          RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition())
           
       // Transitions from SUBMITTED state
-      .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
-          RMAppAttemptEventType.APP_REJECTED, new AppRejectedTransition())
+      .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
+          RMAppAttemptEventType.APP_REJECTED,
+          new FinalSavingTransition(new AppRejectedTransition(),
+            RMAppAttemptState.FAILED))
       .addTransition(RMAppAttemptState.SUBMITTED, 
           EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
                      RMAppAttemptState.SCHEDULED),
           RMAppAttemptEventType.APP_ACCEPTED, 
           new ScheduleTransition())
-      .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.KILLED,
+      .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.KILL,
-          new BaseFinalTransition(RMAppAttemptState.KILLED))
-      .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
+          new FinalSavingTransition(new BaseFinalTransition(
+            RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
+      .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.REGISTERED,
-          new UnexpectedAMRegisteredTransition())
+          new FinalSavingTransition(
+            new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
           
        // Transitions from SCHEDULED State
       .addTransition(RMAppAttemptState.SCHEDULED,
                      RMAppAttemptState.ALLOCATED_SAVING,
           RMAppAttemptEventType.CONTAINER_ALLOCATED,
           new AMContainerAllocatedTransition())
-      .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.KILLED,
+      .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.KILL,
-          new BaseFinalTransition(RMAppAttemptState.KILLED))
+          new FinalSavingTransition(new BaseFinalTransition(
+            RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
           
        // Transitions from ALLOCATED_SAVING State
       .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
           RMAppAttemptState.ALLOCATED,
-          RMAppAttemptEventType.ATTEMPT_SAVED, new AttemptStoredTransition())
+          RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
       .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
           RMAppAttemptState.ALLOCATED_SAVING,
           RMAppAttemptEventType.CONTAINER_ACQUIRED, 
           new ContainerAcquiredTransition())
        // App could be killed by the client. So need to handle this. 
       .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
-          RMAppAttemptState.KILLED,
+          RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.KILL,
-          new BaseFinalTransition(RMAppAttemptState.KILLED))
-      
+          new FinalSavingTransition(new BaseFinalTransition(
+            RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
+
        // Transitions from LAUNCHED_UNMANAGED_SAVING State
       .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
           RMAppAttemptState.LAUNCHED,
-          RMAppAttemptEventType.ATTEMPT_SAVED, 
+          RMAppAttemptEventType.ATTEMPT_NEW_SAVED, 
           new UnmanagedAMAttemptSavedTransition())
       // attempt should not try to register in this state
       .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
-          RMAppAttemptState.FAILED,
+          RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.REGISTERED,
-          new UnexpectedAMRegisteredTransition())
+          new FinalSavingTransition(
+            new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
       // App could be killed by the client. So need to handle this. 
       .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
-          RMAppAttemptState.KILLED,
+          RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.KILL,
-          new BaseFinalTransition(RMAppAttemptState.KILLED))
+          new FinalSavingTransition(new BaseFinalTransition(
+            RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
 
        // Transitions from ALLOCATED State
       .addTransition(RMAppAttemptState.ALLOCATED,
@@ -239,32 +256,40 @@ public class RMAppAttemptImpl implements
           new ContainerAcquiredTransition())
       .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED,
           RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
-      .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FAILED,
-          RMAppAttemptEventType.LAUNCH_FAILED, new LaunchFailedTransition())
-      .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.KILLED,
-          RMAppAttemptEventType.KILL, new KillAllocatedAMTransition())
+      .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
+          RMAppAttemptEventType.LAUNCH_FAILED,
+          new FinalSavingTransition(new LaunchFailedTransition(),
+            RMAppAttemptState.FAILED))
+      .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
+          RMAppAttemptEventType.KILL,
+          new FinalSavingTransition(
+            new KillAllocatedAMTransition(), RMAppAttemptState.KILLED))
           
-      .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FAILED,
+      .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.CONTAINER_FINISHED,
-          new AMContainerCrashedTransition())
+          new FinalSavingTransition(
+            new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
 
        // Transitions from LAUNCHED State
       .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING,
           RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition())
-      .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FAILED,
+      .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.CONTAINER_FINISHED,
-          new AMContainerCrashedTransition())
+          new FinalSavingTransition(
+            new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
       .addTransition(
-          RMAppAttemptState.LAUNCHED, RMAppAttemptState.FAILED,
+          RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.EXPIRE,
-          EXPIRED_TRANSITION)
-      .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.KILLED,
+          new FinalSavingTransition(EXPIRED_TRANSITION,
+            RMAppAttemptState.FAILED))
+      .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.KILL,
-          new FinalTransition(RMAppAttemptState.KILLED))
+          new FinalSavingTransition(new FinalTransition(
+            RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
 
        // Transitions from RUNNING State
       .addTransition(RMAppAttemptState.RUNNING,
-          EnumSet.of(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED),
+          EnumSet.of(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINISHED),
           RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition())
       .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
           RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
@@ -276,17 +301,41 @@ public class RMAppAttemptImpl implements
                 new ContainerAcquiredTransition())
       .addTransition(
           RMAppAttemptState.RUNNING,
-          EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FAILED),
+          EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING),
           RMAppAttemptEventType.CONTAINER_FINISHED,
           new ContainerFinishedTransition())
       .addTransition(
-          RMAppAttemptState.RUNNING, RMAppAttemptState.FAILED,
+          RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.EXPIRE,
-          EXPIRED_TRANSITION)
+          new FinalSavingTransition(EXPIRED_TRANSITION,
+            RMAppAttemptState.FAILED))
       .addTransition(
-          RMAppAttemptState.RUNNING, RMAppAttemptState.KILLED,
+          RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.KILL,
-          new FinalTransition(RMAppAttemptState.KILLED))
+          new FinalSavingTransition(new FinalTransition(
+            RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
+
+       // Transitions from FINAL_SAVING State
+      .addTransition(RMAppAttemptState.FINAL_SAVING,
+          EnumSet.of(RMAppAttemptState.FINISHING, RMAppAttemptState.FAILED,
+            RMAppAttemptState.KILLED, RMAppAttemptState.FINISHED),
+            RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED,
+            new FinalStateSavedTransition())
+      .addTransition(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINAL_SAVING,
+          RMAppAttemptEventType.CONTAINER_FINISHED,
+          new ContainerFinishedAtFinalSavingTransition())
+      .addTransition(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINAL_SAVING,
+          RMAppAttemptEventType.EXPIRE,
+          new AMExpiredAtFinalSavingTransition())
+      .addTransition(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINAL_SAVING,
+          EnumSet.of(
+              RMAppAttemptEventType.UNREGISTERED,
+              RMAppAttemptEventType.STATUS_UPDATE,
+            // should be fixed to reject container allocate request at Final
+            // Saving in scheduler
+              RMAppAttemptEventType.CONTAINER_ALLOCATED,
+              RMAppAttemptEventType.CONTAINER_ACQUIRED,
+              RMAppAttemptEventType.KILL))
 
       // Transitions from FAILED State
       .addTransition(
@@ -338,7 +387,6 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.EXPIRE,
               RMAppAttemptEventType.REGISTERED,
               RMAppAttemptEventType.CONTAINER_ALLOCATED,
-              RMAppAttemptEventType.ATTEMPT_SAVED,
               RMAppAttemptEventType.CONTAINER_FINISHED,
               RMAppAttemptEventType.UNREGISTERED,
               RMAppAttemptEventType.KILL,
@@ -357,7 +405,7 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.REGISTERED,
               RMAppAttemptEventType.CONTAINER_ALLOCATED,
               RMAppAttemptEventType.CONTAINER_ACQUIRED,
-              RMAppAttemptEventType.ATTEMPT_SAVED,
+              RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
               RMAppAttemptEventType.CONTAINER_FINISHED,
               RMAppAttemptEventType.UNREGISTERED,
               RMAppAttemptEventType.KILL,
@@ -411,7 +459,7 @@ public class RMAppAttemptImpl implements
   public RMAppAttemptState getAppAttemptState() {
     this.readLock.lock();
     try {
-      return this.stateMachine.getCurrentState();
+        return this.stateMachine.getCurrentState();
     } finally {
       this.readLock.unlock();
     }
@@ -444,7 +492,7 @@ public class RMAppAttemptImpl implements
     this.readLock.lock();
     try {
       return (getSubmissionContext().getUnmanagedAM()) ? 
-              this.origTrackingUrl : this.proxiedTrackingUrl;
+              this.originalTrackingUrl : this.proxiedTrackingUrl;
     } finally {
       this.readLock.unlock();
     }
@@ -454,7 +502,7 @@ public class RMAppAttemptImpl implements
   public String getOriginalTrackingUrl() {
     this.readLock.lock();
     try {
-      return this.origTrackingUrl;
+      return this.originalTrackingUrl;
     } finally {
       this.readLock.unlock();
     }    
@@ -490,10 +538,10 @@ public class RMAppAttemptImpl implements
   }
 
   private void setTrackingUrlToRMAppPage() {
-    origTrackingUrl = pjoin(
+    originalTrackingUrl = pjoin(
         WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf),
         "cluster", "app", getAppAttemptId().getApplicationId());
-    proxiedTrackingUrl = origTrackingUrl;
+    proxiedTrackingUrl = originalTrackingUrl;
   }
 
   // This is only used for RMStateStore. Normal operation must invoke the secret
@@ -539,16 +587,6 @@ public class RMAppAttemptImpl implements
     }
   }
 
-  public void setDiagnostics(String message) {
-    this.writeLock.lock();
-
-    try {
-      this.diagnostics.append(message);
-    } finally {
-      this.writeLock.unlock();
-    }
-  }
-
   @Override
   public float getProgress() {
     this.readLock.lock();
@@ -673,19 +711,26 @@ public class RMAppAttemptImpl implements
   }
 
   @Override
-  public void recover(RMState state) throws Exception{
-    ApplicationState appState = 
+  public void recover(RMState state) throws Exception {
+    ApplicationState appState =
         state.getApplicationState().get(getAppAttemptId().getApplicationId());
-    ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId());
+    ApplicationAttemptState attemptState =
+        appState.getAttempt(getAppAttemptId());
     assert attemptState != null;
+    LOG.info("Recovered attempt: AppId: "
+        + getAppAttemptId().getApplicationId() + " AttemptId: "
+        + getAppAttemptId() + " MasterContainer: " + masterContainer);
+    diagnostics.append("Attempt recovered after RM restart");
+    diagnostics.append(attemptState.getDiagnostics());
     setMasterContainer(attemptState.getMasterContainer());
     recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials());
-    LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId()
-             + " AttemptId: " + getAppAttemptId()
-             + " MasterContainer: " + masterContainer);
-    setDiagnostics("Attempt recovered after RM restart");
-    handle(new RMAppAttemptEvent(getAppAttemptId(), 
-                                 RMAppAttemptEventType.RECOVER));
+    this.recoveredFinalState = attemptState.getState();
+    this.originalTrackingUrl = attemptState.getFinalTrackingUrl();
+    this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
+    this.finalStatus = attemptState.getFinalApplicationStatus();
+    this.startTime = attemptState.getStartTime();
+    handle(new RMAppAttemptEvent(getAppAttemptId(),
+      RMAppAttemptEventType.RECOVER));
   }
 
   private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
@@ -763,7 +808,7 @@ public class RMAppAttemptImpl implements
       
       // Save the diagnostic message
       String message = rejectedEvent.getMessage();
-      appAttempt.setDiagnostics(message);
+      appAttempt.diagnostics.append(message);
 
       // Send the rejection event to app
       appAttempt.eventHandler.handle(
@@ -810,10 +855,8 @@ public class RMAppAttemptImpl implements
         }
         return RMAppAttemptState.SCHEDULED;
       } else {
-        // RM not allocating container. AM is self launched. 
-        RMStateStore store = appAttempt.rmContext.getStateStore();
         // save state and then go to LAUNCHED state
-        appAttempt.storeAttempt(store);
+        appAttempt.storeAttempt();
         return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
       }
     }
@@ -838,8 +881,7 @@ public class RMAppAttemptImpl implements
                                                                            0));
       appAttempt.getSubmissionContext().setResource(
           appAttempt.getMasterContainer().getResource());
-      RMStateStore store = appAttempt.rmContext.getStateStore();
-      appAttempt.storeAttempt(store);
+      appAttempt.storeAttempt();
     }
   }
   
@@ -851,6 +893,134 @@ public class RMAppAttemptImpl implements
       appAttempt.launchAttempt();
     }
   }
+
+  private static class AttemptRecoveredTransition
+      implements
+      MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
+    @Override
+    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
+        RMAppAttemptEvent event) {
+      if (appAttempt.recoveredFinalState != null) {
+        appAttempt.progress = 1.0f;
+        return appAttempt.recoveredFinalState;
+      } else {
+        return RMAppAttemptState.RECOVERED;
+      }
+    }
+  }
+
+  private void rememberTargetTransitions(RMAppAttemptEvent event,
+      Object transitionToDo, RMAppAttemptState targetFinalState) {
+    transitionTodo = transitionToDo;
+    targetedFinalState = targetFinalState;
+    eventCausingFinalSaving = event;
+  }
+
+  private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event,
+      Object transitionToDo, RMAppAttemptState targetFinalState,
+      RMAppAttemptState stateToBeStored) {
+
+    rememberTargetTransitions(event, transitionToDo, targetFinalState);
+
+    // As of today, finalState, diagnostics, final-tracking-url and
+    // finalAppStatus are the only things that we store into the StateStore
+    // AFTER the initial saving on app-attempt-start
+    // These fields can be visible from outside only after they are saved in
+    // StateStore
+    String diags = null;
+    String finalTrackingUrl = null;
+    FinalApplicationStatus finalStatus = null;
+
+    switch (event.getType()) {
+    case APP_REJECTED:
+      RMAppAttemptRejectedEvent rejectedEvent =
+          (RMAppAttemptRejectedEvent) event;
+      diags = rejectedEvent.getMessage();
+      break;
+    case LAUNCH_FAILED:
+      RMAppAttemptLaunchFailedEvent launchFaileEvent =
+          (RMAppAttemptLaunchFailedEvent) event;
+      diags = launchFaileEvent.getMessage();
+      break;
+    case REGISTERED:
+      diags = getUnexpectedAMRegisteredDiagnostics();
+      break;
+    case UNREGISTERED:
+      RMAppAttemptUnregistrationEvent unregisterEvent =
+          (RMAppAttemptUnregistrationEvent) event;
+      diags = unregisterEvent.getDiagnostics();
+      finalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl());
+      finalStatus = unregisterEvent.getFinalApplicationStatus();
+      break;
+    case CONTAINER_FINISHED:
+      RMAppAttemptContainerFinishedEvent finishEvent =
+          (RMAppAttemptContainerFinishedEvent) event;
+      diags = getAMContainerCrashedDiagnostics(finishEvent);
+      break;
+    case KILL:
+      break;
+    case EXPIRE:
+      diags = getAMExpiredDiagnostics(event);
+      break;
+    default:
+      break;
+    }
+
+    RMStateStore rmStore = rmContext.getStateStore();
+    ApplicationAttemptState attemptState =
+        new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
+          rmStore.getCredentialsFromAppAttempt(this), startTime,
+          stateToBeStored, finalTrackingUrl, diags, finalStatus);
+    LOG.info("Updating application attempt " + applicationAttemptId
+        + " with final state: " + targetedFinalState);
+    rmStore.updateApplicationAttemptState(attemptState);
+  }
+
+  private static class FinalSavingTransition extends BaseTransition {
+
+    Object transitionToDo;
+    RMAppAttemptState targetedFinalState;
+
+    public FinalSavingTransition(Object transitionToDo,
+        RMAppAttemptState targetedFinalState) {
+      this.transitionToDo = transitionToDo;
+      this.targetedFinalState = targetedFinalState;
+    }
+
+    @Override
+    public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+      // For cases Killed/Failed, targetedFinalState is the same as the state to
+      // be stored
+      appAttempt.rememberTargetTransitionsAndStoreState(event, transitionToDo,
+        targetedFinalState, targetedFinalState);
+    }
+  }
+
+  private static class FinalStateSavedTransition implements
+      MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
+    @Override
+    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
+        RMAppAttemptEvent event) {
+      RMAppAttemptUpdateSavedEvent storeEvent = (RMAppAttemptUpdateSavedEvent) event;
+      if (storeEvent.getUpdatedException() != null) {
+        LOG.error("Failed to update the final state of application attempt: "
+            + storeEvent.getApplicationAttemptId(),
+          storeEvent.getUpdatedException());
+        ExitUtil.terminate(1, storeEvent.getUpdatedException());
+      }
+
+      RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving;
+
+      if (appAttempt.transitionTodo instanceof SingleArcTransition) {
+        ((SingleArcTransition) appAttempt.transitionTodo).transition(
+          appAttempt, causeEvent);
+      } else if (appAttempt.transitionTodo instanceof MultipleArcTransition) {
+        ((MultipleArcTransition) appAttempt.transitionTodo).transition(
+          appAttempt, causeEvent);
+      }
+      return appAttempt.targetedFinalState;
+    }
+  }
   
   private static class BaseFinalTransition extends BaseTransition {
 
@@ -998,15 +1168,20 @@ public class RMAppAttemptImpl implements
           = (RMAppAttemptRegistrationEvent) event;
       appAttempt.host = registrationEvent.getHost();
       appAttempt.rpcPort = registrationEvent.getRpcport();
-      appAttempt.origTrackingUrl =
+      appAttempt.originalTrackingUrl =
           sanitizeTrackingUrl(registrationEvent.getTrackingurl());
       appAttempt.proxiedTrackingUrl = 
-        appAttempt.generateProxyUriWithScheme(appAttempt.origTrackingUrl);
+        appAttempt.generateProxyUriWithScheme(appAttempt.originalTrackingUrl);
 
       // Let the app know
       appAttempt.eventHandler.handle(new RMAppEvent(appAttempt
           .getAppAttemptId().getApplicationId(),
           RMAppEventType.ATTEMPT_REGISTERED));
+
+      // TODO:FIXME: Note for future. Unfortunately we only do a state-store
+      // write at AM launch time, so we don't save the AM's tracking URL anywhere
+      // as that would mean an extra state-store write. For now, we hope that in
+      // work-preserving restart, AMs are forced to reregister.
     }
   }
 
@@ -1029,17 +1204,24 @@ public class RMAppAttemptImpl implements
           appAttempt.getAppAttemptId());
 
       // Setup diagnostic message
-      ContainerStatus status = finishEvent.getContainerStatus();
-      appAttempt.diagnostics.append("AM Container for " +
-          appAttempt.getAppAttemptId() + " exited with " +
-          " exitCode: " + status.getExitStatus() +
-          " due to: " +  status.getDiagnostics() + "." +
-          "Failing this attempt.");
+      appAttempt.diagnostics
+        .append(getAMContainerCrashedDiagnostics(finishEvent));
       // Tell the app, scheduler
       super.transition(appAttempt, finishEvent);
     }
   }
 
+  private static String getAMContainerCrashedDiagnostics(
+      RMAppAttemptContainerFinishedEvent finishEvent) {
+    ContainerStatus status = finishEvent.getContainerStatus();
+    String diagnostics =
+        "AM Container for " + finishEvent.getApplicationAttemptId()
+            + " exited with " + " exitCode: " + status.getExitStatus()
+            + " due to: " + status.getDiagnostics() + "."
+            + "Failing this attempt.";
+    return diagnostics;
+  }
+
   private static class FinalTransition extends BaseFinalTransition {
 
     public FinalTransition(RMAppAttemptState finalAttemptState) {
@@ -1055,7 +1237,8 @@ public class RMAppAttemptImpl implements
       // Tell the app and the scheduler
       super.transition(appAttempt, event);
 
-      // UnRegister from AMLivelinessMonitor
+      // UnRegister from AMLivelinessMonitor. Perhaps for
+      // FAILING/KILLED/UnManaged AMs
       appAttempt.rmContext.getAMLivelinessMonitor().unregister(
           appAttempt.getAppAttemptId());
       appAttempt.rmContext.getAMFinishingMonitor().unregister(
@@ -1078,12 +1261,18 @@ public class RMAppAttemptImpl implements
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
-      appAttempt.diagnostics.append("ApplicationMaster for attempt " +
-        appAttempt.getAppAttemptId() + " timed out");
+      appAttempt.diagnostics.append(getAMExpiredDiagnostics(event));
       super.transition(appAttempt, event);
     }
   }
 
+  private static String getAMExpiredDiagnostics(RMAppAttemptEvent event) {
+    String diag =
+        "ApplicationMaster for attempt " + event.getApplicationAttemptId()
+            + " timed out";
+    return diag;
+  }
+
   private static class UnexpectedAMRegisteredTransition extends
       BaseFinalTransition {
 
@@ -1094,13 +1283,16 @@ public class RMAppAttemptImpl implements
     @Override
     public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
       assert appAttempt.submissionContext.getUnmanagedAM();
-      appAttempt
-          .setDiagnostics("Unmanaged AM must register after AM attempt reaches LAUNCHED state.");
+      appAttempt.diagnostics.append(getUnexpectedAMRegisteredDiagnostics());
       super.transition(appAttempt, event);
     }
 
   }
 
+  private static String getUnexpectedAMRegisteredDiagnostics() {
+    return "Unmanaged AM must register after AM attempt reaches LAUNCHED state.";
+  }
+
   private static final class StatusUpdateTransition extends
       BaseTransition {
     @Override
@@ -1125,38 +1317,62 @@ public class RMAppAttemptImpl implements
     @Override
     public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
-      ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId();
-
-      appAttempt.rmContext.getAMLivelinessMonitor().unregister(appAttemptId);
-
-      appAttempt.progress = 1.0f;
-
-      RMAppAttemptUnregistrationEvent unregisterEvent
-        = (RMAppAttemptUnregistrationEvent) event;
-      appAttempt.diagnostics.append(unregisterEvent.getDiagnostics());
-      appAttempt.origTrackingUrl =
-          sanitizeTrackingUrl(unregisterEvent.getTrackingUrl());
-      appAttempt.proxiedTrackingUrl = 
-        appAttempt.generateProxyUriWithScheme(appAttempt.origTrackingUrl);
-      appAttempt.finalStatus = unregisterEvent.getFinalApplicationStatus();
-
       // Tell the app
       if (appAttempt.getSubmissionContext().getUnmanagedAM()) {
         // Unmanaged AMs have no container to wait for, so they skip
         // the FINISHING state and go straight to FINISHED.
+        appAttempt.updateInfoOnAMUnregister(event);
         new FinalTransition(RMAppAttemptState.FINISHED).transition(
             appAttempt, event);
         return RMAppAttemptState.FINISHED;
       }
-      appAttempt.rmContext.getAMFinishingMonitor().register(appAttemptId);
+      // Saving the attempt final state
+      appAttempt.rememberTargetTransitionsAndStoreState(event,
+        new FinalStateSavedAfterAMUnregisterTransition(),
+        RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED);
       ApplicationId applicationId =
           appAttempt.getAppAttemptId().getApplicationId();
-      appAttempt.eventHandler.handle(
-          new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_UNREGISTERED));
-      return RMAppAttemptState.FINISHING;
+
+      // Tell the app immediately that AM is unregistering so that app itself
+      // can save its state as soon as possible. Whether we do it like this, or
+      // we wait till AppAttempt is saved, it doesn't make any difference on the
+      // app side w.r.t failure conditions. The only event going out of
+      // AppAttempt to App after this point of time is AM/AppAttempt Finished.
+      appAttempt.eventHandler.handle(new RMAppEvent(applicationId,
+        RMAppEventType.ATTEMPT_UNREGISTERED));
+      return RMAppAttemptState.FINAL_SAVING;
+    }
+  }
+
+  private static class FinalStateSavedAfterAMUnregisterTransition extends
+      BaseTransition {
+    @Override
+    public void
+        transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+      // Unregister from the AMlivenessMonitor and register with AMFinishingMonitor
+      appAttempt.rmContext.getAMLivelinessMonitor().unregister(
+        appAttempt.applicationAttemptId);
+      appAttempt.rmContext.getAMFinishingMonitor().register(
+        appAttempt.applicationAttemptId);
+
+      // Do not make any more changes to this transition code. Make all changes
+      // to the following method. Unless you are absolutely sure that you have
+      // stuff to do that shouldn't be used by the callers of the following
+      // method.
+      appAttempt.updateInfoOnAMUnregister(event);
     }
   }
 
+  private void updateInfoOnAMUnregister(RMAppAttemptEvent event) {
+    progress = 1.0f;
+    RMAppAttemptUnregistrationEvent unregisterEvent =
+        (RMAppAttemptUnregistrationEvent) event;
+    diagnostics.append(unregisterEvent.getDiagnostics());
+    originalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl());
+    proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
+    finalStatus = unregisterEvent.getFinalApplicationStatus();
+  }
+
   private static final class ContainerAcquiredTransition extends
       BaseTransition {
     @Override
@@ -1185,29 +1401,37 @@ public class RMAppAttemptImpl implements
       // the AMContainer, AppAttempt fails
       if (appAttempt.masterContainer != null
           && appAttempt.masterContainer.getId().equals(
-              containerStatus.getContainerId())) {
-        // container associated with AM. must not be unmanaged 
-        assert appAttempt.submissionContext.getUnmanagedAM() == false;
-        // Setup diagnostic message
-        appAttempt.diagnostics.append("AM Container for " +
-            appAttempt.getAppAttemptId() + " exited with " +
-            " exitCode: " + containerStatus.getExitStatus() +
-            " due to: " +  containerStatus.getDiagnostics() + "." +
-            "Failing this attempt.");
-
-        new FinalTransition(RMAppAttemptState.FAILED).transition(
-            appAttempt, containerFinishedEvent);
-        return RMAppAttemptState.FAILED;
+            containerStatus.getContainerId())) {
+        // Remember the follow up transition and save the final attempt state.
+        appAttempt.rememberTargetTransitionsAndStoreState(event,
+          new ContainerFinishedFinalStateSavedTransition(),
+          RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
+        return RMAppAttemptState.FINAL_SAVING;
       }
 
-      // Normal container.
-
-      // Put it in completedcontainers list
+      // Normal container.Put it in completedcontainers list
       appAttempt.justFinishedContainers.add(containerStatus);
       return RMAppAttemptState.RUNNING;
     }
   }
 
+  private static class ContainerFinishedFinalStateSavedTransition extends
+      BaseTransition {
+    @Override
+    public void
+        transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+      RMAppAttemptContainerFinishedEvent containerFinishedEvent =
+          (RMAppAttemptContainerFinishedEvent) event;
+      // container associated with AM. must not be unmanaged
+      assert appAttempt.submissionContext.getUnmanagedAM() == false;
+      // Setup diagnostic message
+      appAttempt.diagnostics
+        .append(getAMContainerCrashedDiagnostics(containerFinishedEvent));
+      new FinalTransition(RMAppAttemptState.FAILED).transition(appAttempt,
+        event);
+    }
+  }
+
   private static final class AMFinishingContainerFinishedTransition
       implements
       MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@@ -1228,13 +1452,83 @@ public class RMAppAttemptImpl implements
             appAttempt, containerFinishedEvent);
         return RMAppAttemptState.FINISHED;
       }
-
       // Normal container.
       appAttempt.justFinishedContainers.add(containerStatus);
       return RMAppAttemptState.FINISHING;
     }
   }
 
+  private static class ContainerFinishedAtFinalSavingTransition extends
+      BaseTransition {
+    @Override
+    public void
+        transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+      RMAppAttemptContainerFinishedEvent containerFinishedEvent =
+          (RMAppAttemptContainerFinishedEvent) event;
+      ContainerStatus containerStatus =
+          containerFinishedEvent.getContainerStatus();
+
+      // If this is the AM container, it means the AM container is finished,
+      // but we are not yet acknowledged that the final state has been saved.
+      // Thus, we still return FINAL_SAVING state here.
+      if (appAttempt.masterContainer.getId().equals(
+        containerStatus.getContainerId())) {
+        if (appAttempt.targetedFinalState.equals(RMAppAttemptState.FAILED)
+            || appAttempt.targetedFinalState.equals(RMAppAttemptState.KILLED)) {
+          // ignore Container_Finished Event if we were supposed to reach
+          // FAILED/KILLED state.
+          return;
+        }
+
+        // pass in the earlier AMUnregistered Event also, as this is needed for
+        // AMFinishedAfterFinalSavingTransition later on
+        appAttempt.rememberTargetTransitions(event,
+          new AMFinishedAfterFinalSavingTransition(
+            appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED);
+        return;
+      }
+      // Normal container.
+      appAttempt.justFinishedContainers.add(containerStatus);
+    }
+  }
+
+  private static class AMFinishedAfterFinalSavingTransition extends
+      BaseTransition {
+    RMAppAttemptEvent amUnregisteredEvent;
+    public AMFinishedAfterFinalSavingTransition(
+        RMAppAttemptEvent amUnregisteredEvent) {
+      this.amUnregisteredEvent = amUnregisteredEvent;
+    }
+
+    @Override
+    public void
+        transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+      appAttempt.updateInfoOnAMUnregister(amUnregisteredEvent);
+      new FinalTransition(RMAppAttemptState.FINISHED).transition(appAttempt,
+        event);
+    }
+  }
+
+  private static class AMExpiredAtFinalSavingTransition extends
+      BaseTransition {
+    @Override
+    public void
+        transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+      if (appAttempt.targetedFinalState.equals(RMAppAttemptState.FAILED)
+          || appAttempt.targetedFinalState.equals(RMAppAttemptState.KILLED)) {
+        // ignore Container_Finished Event if we were supposed to reach
+        // FAILED/KILLED state.
+        return;
+      }
+
+      // pass in the earlier AMUnregistered Event also, as this is needed for
+      // AMFinishedAfterFinalSavingTransition later on
+      appAttempt.rememberTargetTransitions(event,
+        new AMFinishedAfterFinalSavingTransition(
+        appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED);
+    }
+  }
+
   @Override
   public long getStartTime() {
     this.readLock.lock();
@@ -1256,7 +1550,7 @@ public class RMAppAttemptImpl implements
   }
   
   private void checkAttemptStoreError(RMAppAttemptEvent event) {
-    RMAppAttemptStoredEvent storeEvent = (RMAppAttemptStoredEvent) event;
+    RMAppAttemptNewSavedEvent storeEvent = (RMAppAttemptNewSavedEvent) event;
     if(storeEvent.getStoredException() != null)
     {
       // This needs to be handled for HA and give up master status if we got
@@ -1267,7 +1561,7 @@ public class RMAppAttemptImpl implements
     }
   }
   
-  private void storeAttempt(RMStateStore store) {
+  private void storeAttempt() {
     // store attempt data in a non-blocking manner to prevent dispatcher
     // thread starvation and wait for state to be saved
     LOG.info("Storing attempt: AppId: " + 
@@ -1275,7 +1569,7 @@ public class RMAppAttemptImpl implements
               + " AttemptId: " + 
               getAppAttemptId()
               + " MasterContainer: " + masterContainer);
-    store.storeApplicationAttempt(this);
+    rmContext.getStateStore().storeNewApplicationAttempt(this);
   }
 
   private void removeCredentials(RMAppAttemptImpl appAttempt) {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java?rev=1537560&r1=1537559&r2=1537560&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java Thu Oct 31 17:25:06 2013
@@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.re
 
 public enum RMAppAttemptState {
   NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING, 
-  FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED
+  FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED,
+  FINAL_SAVING
 }

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptNewSavedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptNewSavedEvent.java?rev=1537560&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptNewSavedEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptNewSavedEvent.java Thu Oct 31 17:25:06 2013
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+
+public class RMAppAttemptNewSavedEvent extends RMAppAttemptEvent {
+
+  final Exception storedException;
+  
+  public RMAppAttemptNewSavedEvent(ApplicationAttemptId appAttemptId,
+                                 Exception storedException) {
+    super(appAttemptId, RMAppAttemptEventType.ATTEMPT_NEW_SAVED);
+    this.storedException = storedException;
+  }
+  
+  public Exception getStoredException() {
+    return storedException;
+  }
+
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUnregistrationEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUnregistrationEvent.java?rev=1537560&r1=1537559&r2=1537560&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUnregistrationEvent.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUnregistrationEvent.java Thu Oct 31 17:25:06 2013
@@ -25,20 +25,20 @@ import org.apache.hadoop.yarn.server.res
 
 public class RMAppAttemptUnregistrationEvent extends RMAppAttemptEvent {
 
-  private final String trackingUrl;
+  private final String finalTrackingUrl;
   private final FinalApplicationStatus finalStatus;
   private final String diagnostics;
 
   public RMAppAttemptUnregistrationEvent(ApplicationAttemptId appAttemptId,
       String trackingUrl, FinalApplicationStatus finalStatus, String diagnostics) {
     super(appAttemptId, RMAppAttemptEventType.UNREGISTERED);
-    this.trackingUrl = trackingUrl;
+    this.finalTrackingUrl = trackingUrl;
     this.finalStatus = finalStatus;
     this.diagnostics = diagnostics;
   }
 
-  public String getTrackingUrl() {
-    return this.trackingUrl;
+  public String getFinalTrackingUrl() {
+    return this.finalTrackingUrl;
   }
 
   public FinalApplicationStatus getFinalApplicationStatus() {

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUpdateSavedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUpdateSavedEvent.java?rev=1537560&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUpdateSavedEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUpdateSavedEvent.java Thu Oct 31 17:25:06 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+
+public class RMAppAttemptUpdateSavedEvent extends RMAppAttemptEvent {
+
+  final Exception updatedException;
+
+  public RMAppAttemptUpdateSavedEvent(ApplicationAttemptId appAttemptId,
+      Exception updatedException) {
+    super(appAttemptId, RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED);
+    this.updatedException = updatedException;
+  }
+
+  public Exception getUpdatedException() {
+    return updatedException;
+  }
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1537560&r1=1537559&r2=1537560&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java Thu Oct 31 17:25:06 2013
@@ -202,6 +202,12 @@ public class MockAM {
     final FinishApplicationMasterRequest req =
         FinishApplicationMasterRequest.newInstance(
           FinalApplicationStatus.SUCCEEDED, "", "");
+    unregisterAppAttempt(req);
+  }
+
+  public void unregisterAppAttempt(final FinishApplicationMasterRequest req)
+      throws Exception {
+    waitForState(RMAppAttemptState.RUNNING);
     UserGroupInformation ugi =
         UserGroupInformation.createRemoteUser(attemptId.toString());
     Token<AMRMTokenIdentifier> token =
@@ -216,4 +222,8 @@ public class MockAM {
       }
     });
   }
+
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return this.attemptId;
+  }
 }



Mime
View raw message