hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1540535 [2/5] - in /hadoop/common/branches/YARN-321/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/ha...
Date Sun, 10 Nov 2013 20:09:16 GMT
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Sun Nov 10 20:09:09 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -50,10 +51,14 @@ import org.apache.hadoop.yarn.security.c
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 
 @Private
 @Unstable
@@ -86,13 +91,32 @@ public abstract class RMStateStore exten
     final ApplicationAttemptId attemptId;
     final Container masterContainer;
     final Credentials appAttemptCredentials;
+    long startTime = 0;
+    // fields set when attempt completes
+    RMAppAttemptState state;
+    String finalTrackingUrl = "N/A";
+    String diagnostics;
+    FinalApplicationStatus amUnregisteredFinalStatus;
 
     public ApplicationAttemptState(ApplicationAttemptId attemptId,
-        Container masterContainer,
-        Credentials appAttemptCredentials) {
+        Container masterContainer, Credentials appAttemptCredentials,
+        long startTime) {
+      this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
+        null, "", null);
+    }
+
+    public ApplicationAttemptState(ApplicationAttemptId attemptId,
+        Container masterContainer, Credentials appAttemptCredentials,
+        long startTime, RMAppAttemptState state, String finalTrackingUrl,
+        String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus) {
       this.attemptId = attemptId;
       this.masterContainer = masterContainer;
       this.appAttemptCredentials = appAttemptCredentials;
+      this.startTime = startTime;
+      this.state = state;
+      this.finalTrackingUrl = finalTrackingUrl;
+      this.diagnostics = diagnostics == null ? "" : diagnostics;
+      this.amUnregisteredFinalStatus = amUnregisteredFinalStatus;
     }
 
     public Container getMasterContainer() {
@@ -104,6 +128,21 @@ public abstract class RMStateStore exten
     public Credentials getAppAttemptCredentials() {
       return appAttemptCredentials;
     }
+    public RMAppAttemptState getState(){
+      return state;
+    }
+    public String getFinalTrackingUrl() {
+      return finalTrackingUrl;
+    }
+    public String getDiagnostics() {
+      return diagnostics;
+    }
+    public long getStartTime() {
+      return startTime;
+    }
+    public FinalApplicationStatus getFinalApplicationStatus() {
+      return amUnregisteredFinalStatus;
+    }
   }
   
   /**
@@ -112,15 +151,30 @@ public abstract class RMStateStore exten
   public static class ApplicationState {
     final ApplicationSubmissionContext context;
     final long submitTime;
+    final long startTime;
     final String user;
     Map<ApplicationAttemptId, ApplicationAttemptState> attempts =
                   new HashMap<ApplicationAttemptId, ApplicationAttemptState>();
-    
-    ApplicationState(long submitTime, ApplicationSubmissionContext context,
-        String user) {
+    // fields set when application completes.
+    RMAppState state;
+    String diagnostics;
+    long finishTime;
+
+    public ApplicationState(long submitTime,
+        long startTime, ApplicationSubmissionContext context, String user) {
+      this(submitTime, startTime, context, user, null, "", 0);
+    }
+
+    public ApplicationState(long submitTime,
+        long startTime,ApplicationSubmissionContext context,
+        String user, RMAppState state, String diagnostics, long finishTime) {
       this.submitTime = submitTime;
+      this.startTime = startTime;
       this.context = context;
       this.user = user;
+      this.state = state;
+      this.diagnostics = diagnostics == null ? "" : diagnostics;
+      this.finishTime = finishTime;
     }
 
     public ApplicationId getAppId() {
@@ -129,6 +183,9 @@ public abstract class RMStateStore exten
     public long getSubmitTime() {
       return submitTime;
     }
+    public long getStartTime() {
+      return startTime;
+    }
     public int getAttemptCount() {
       return attempts.size();
     }
@@ -141,6 +198,15 @@ public abstract class RMStateStore exten
     public String getUser() {
       return user;
     }
+    public RMAppState getState() {
+      return state;
+    }
+    public String getDiagnostics() {
+      return diagnostics;
+    }
+    public long getFinishTime() {
+      return finishTime;
+    }
   }
 
   public static class RMDTSecretManagerState {
@@ -195,17 +261,20 @@ public abstract class RMStateStore exten
   }
   
   AsyncDispatcher dispatcher;
-  
-  public synchronized void serviceInit(Configuration conf) throws Exception{    
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception{
     // create async handler
     dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.register(RMStateStoreEventType.class, 
                         new ForwardingEventHandler());
+    dispatcher.setDrainEventsOnStop();
     initInternal(conf);
   }
-  
-  protected synchronized void serviceStart() throws Exception {
+
+  @Override
+  protected void serviceStart() throws Exception {
     dispatcher.start();
     startInternal();
   }
@@ -222,11 +291,12 @@ public abstract class RMStateStore exten
    */
   protected abstract void startInternal() throws Exception;
 
-  public synchronized void serviceStop() throws Exception {
+  @Override
+  protected void serviceStop() throws Exception {
     closeInternal();
     dispatcher.stop();
   }
-  
+
   /**
    * Derived classes close themselves using this method.
    * The base class will be closed and the event dispatcher will be shutdown 
@@ -249,23 +319,31 @@ public abstract class RMStateStore exten
    * RMAppStoredEvent will be sent on completion to notify the RMApp
    */
   @SuppressWarnings("unchecked")
-  public synchronized void storeApplication(RMApp app) {
+  public synchronized void storeNewApplication(RMApp app) {
     ApplicationSubmissionContext context = app
                                             .getApplicationSubmissionContext();
     assert context instanceof ApplicationSubmissionContextPBImpl;
-    ApplicationState appState = new ApplicationState(
-        app.getSubmitTime(), context, app.getUser());
+    ApplicationState appState =
+        new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
+          app.getUser());
     dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
   }
-    
+
+  @SuppressWarnings("unchecked")
+  public synchronized void updateApplicationState(ApplicationState appState) {
+    dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState));
+  }
+
   /**
    * Blocking API
    * Derived classes must implement this method to store the state of an 
    * application.
    */
-  protected abstract void storeApplicationState(String appId,
-                                      ApplicationStateDataPBImpl appStateData) 
-                                      throws Exception;
+  protected abstract void storeApplicationStateInternal(String appId,
+      ApplicationStateDataPBImpl appStateData) throws Exception;
+
+  protected abstract void updateApplicationStateInternal(String appId,
+      ApplicationStateDataPBImpl appStateData) throws Exception;
   
   @SuppressWarnings("unchecked")
   /**
@@ -274,26 +352,35 @@ public abstract class RMStateStore exten
    * This does not block the dispatcher threads
    * RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt
    */
-  public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) {
+  public synchronized void storeNewApplicationAttempt(RMAppAttempt appAttempt) {
     Credentials credentials = getCredentialsFromAppAttempt(appAttempt);
 
     ApplicationAttemptState attemptState =
         new ApplicationAttemptState(appAttempt.getAppAttemptId(),
-          appAttempt.getMasterContainer(), credentials);
+          appAttempt.getMasterContainer(), credentials,
+          appAttempt.getStartTime());
 
     dispatcher.getEventHandler().handle(
       new RMStateStoreAppAttemptEvent(attemptState));
   }
-  
+
+  @SuppressWarnings("unchecked")
+  public synchronized void updateApplicationAttemptState(
+      ApplicationAttemptState attemptState) {
+    dispatcher.getEventHandler().handle(
+      new RMStateUpdateAppAttemptEvent(attemptState));
+  }
+
   /**
    * Blocking API
    * Derived classes must implement this method to store the state of an 
    * application attempt
    */
-  protected abstract void storeApplicationAttemptState(String attemptId,
-                            ApplicationAttemptStateDataPBImpl attemptStateData) 
-                            throws Exception;
+  protected abstract void storeApplicationAttemptStateInternal(String attemptId,
+      ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
 
+  protected abstract void updateApplicationAttemptStateInternal(String attemptId,
+      ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
 
   /**
    * RMDTSecretManager call this to store the state of a delegation token
@@ -372,13 +459,14 @@ public abstract class RMStateStore exten
    */
   public synchronized void removeApplication(RMApp app) {
     ApplicationState appState = new ApplicationState(
-            app.getSubmitTime(), app.getApplicationSubmissionContext(),
-            app.getUser());
+            app.getSubmitTime(), app.getStartTime(),
+            app.getApplicationSubmissionContext(), app.getUser());
     for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
       Credentials credentials = getCredentialsFromAppAttempt(appAttempt);
       ApplicationAttemptState attemptState =
           new ApplicationAttemptState(appAttempt.getAppAttemptId(),
-            appAttempt.getMasterContainer(), credentials);
+            appAttempt.getMasterContainer(), credentials,
+            appAttempt.getStartTime());
       appState.attempts.put(attemptState.getAttemptId(), attemptState);
     }
     
@@ -409,7 +497,7 @@ public abstract class RMStateStore exten
   public static final Text AM_CLIENT_TOKEN_MASTER_KEY_NAME =
       new Text("YARN_CLIENT_TOKEN_MASTER_KEY");
   
-  private Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) {
+  public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) {
     Credentials credentials = new Credentials();
     Token<AMRMTokenIdentifier> appToken = appAttempt.getAMRMToken();
     if(appToken != null){
@@ -425,94 +513,124 @@ public abstract class RMStateStore exten
   }
 
   // Dispatcher related code
-  
-  private synchronized void handleStoreEvent(RMStateStoreEvent event) {
-    switch(event.getType()) {
-      case STORE_APP:
-        {
-          ApplicationState apptState =
-              ((RMStateStoreAppEvent) event).getAppState();
-          Exception storedException = null;
-          ApplicationStateDataPBImpl appStateData =
-              new ApplicationStateDataPBImpl();
-          appStateData.setSubmitTime(apptState.getSubmitTime());
-          appStateData.setApplicationSubmissionContext(
-              apptState.getApplicationSubmissionContext());
-          appStateData.setUser(apptState.getUser());
-          ApplicationId appId =
-              apptState.getApplicationSubmissionContext().getApplicationId();
-
-          LOG.info("Storing info for app: " + appId);
-          try {
-            storeApplicationState(appId.toString(), appStateData);
-          } catch (Exception e) {
-            LOG.error("Error storing app: " + appId, e);
-            storedException = e;
-          } finally {
-            notifyDoneStoringApplication(appId, storedException);
-          }
+  protected void handleStoreEvent(RMStateStoreEvent event) {
+    if (event.getType().equals(RMStateStoreEventType.STORE_APP)
+        || event.getType().equals(RMStateStoreEventType.UPDATE_APP)) {
+      ApplicationState appState = null;
+      if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
+        appState = ((RMStateStoreAppEvent) event).getAppState();
+      } else {
+        assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
+        appState = ((RMStateUpdateAppEvent) event).getAppState();
+      }
+
+      Exception storedException = null;
+      ApplicationStateDataPBImpl appStateData =
+          (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
+            .newApplicationStateData(appState.getSubmitTime(),
+              appState.getStartTime(), appState.getUser(),
+              appState.getApplicationSubmissionContext(), appState.getState(),
+              appState.getDiagnostics(), appState.getFinishTime());
+
+      ApplicationId appId =
+          appState.getApplicationSubmissionContext().getApplicationId();
+
+      LOG.info("Storing info for app: " + appId);
+      try {
+        if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
+          storeApplicationStateInternal(appId.toString(), appStateData);
+        } else {
+          assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
+          updateApplicationStateInternal(appId.toString(), appStateData);
+        }
+      } catch (Exception e) {
+        LOG.error("Error storing app: " + appId, e);
+        storedException = e;
+      } finally {
+        if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
+          notifyDoneStoringApplication(appId, storedException);
+        } else {
+          notifyDoneUpdatingApplication(appId, storedException);
         }
-        break;
-      case STORE_APP_ATTEMPT:
-        {
-          ApplicationAttemptState attemptState = 
-                    ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
-          Exception storedException = null;
-
-          Credentials credentials = attemptState.getAppAttemptCredentials();
-          ByteBuffer appAttemptTokens = null;
-          try {
-            if(credentials != null){
-              DataOutputBuffer dob = new DataOutputBuffer();
-                credentials.writeTokenStorageToStream(dob);
-              appAttemptTokens =
-                  ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-            }
-            ApplicationAttemptStateDataPBImpl attemptStateData =
-              (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
-                  .newApplicationAttemptStateData(attemptState.getAttemptId(),
-                    attemptState.getMasterContainer(), appAttemptTokens);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
-            }
-            storeApplicationAttemptState(attemptState.getAttemptId().toString(), 
-                                         attemptStateData);
-          } catch (Exception e) {
-            LOG.error("Error storing appAttempt: " 
-                      + attemptState.getAttemptId(), e);
-            storedException = e;
-          } finally {
-            notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), 
-                                                storedException);            
-          }
+      }
+    } else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)
+        || event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT)) {
+
+      ApplicationAttemptState attemptState = null;
+      if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
+        attemptState =
+            ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
+      } else {
+        assert event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT);
+        attemptState =
+            ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
+      }
+
+      Exception storedException = null;
+      Credentials credentials = attemptState.getAppAttemptCredentials();
+      ByteBuffer appAttemptTokens = null;
+      try {
+        if (credentials != null) {
+          DataOutputBuffer dob = new DataOutputBuffer();
+          credentials.writeTokenStorageToStream(dob);
+          appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
         }
-        break;
-      case REMOVE_APP:
-        {
-          ApplicationState appState = 
-                          ((RMStateStoreRemoveAppEvent) event).getAppState();
-          ApplicationId appId = appState.getAppId();
-          Exception removedException = null;
-          LOG.info("Removing info for app: " + appId);
-          try {
-            removeApplicationState(appState);
-          } catch (Exception e) {
-            LOG.error("Error removing app: " + appId, e);
-            removedException = e;
-          } finally {
-            notifyDoneRemovingApplcation(appId, removedException);
-          }
+        ApplicationAttemptStateDataPBImpl attemptStateData =
+            (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
+              .newApplicationAttemptStateData(attemptState.getAttemptId(),
+                attemptState.getMasterContainer(), appAttemptTokens,
+                attemptState.getStartTime(), attemptState.getState(),
+                attemptState.getFinalTrackingUrl(),
+                attemptState.getDiagnostics(),
+                attemptState.getFinalApplicationStatus());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
         }
-        break;
-      default:
-        LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
+        if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
+          storeApplicationAttemptStateInternal(attemptState.getAttemptId()
+            .toString(), attemptStateData);
+        } else {
+          assert event.getType().equals(
+            RMStateStoreEventType.UPDATE_APP_ATTEMPT);
+          updateApplicationAttemptStateInternal(attemptState.getAttemptId()
+            .toString(), attemptStateData);
+        }
+      } catch (Exception e) {
+        LOG
+          .error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
+        storedException = e;
+      } finally {
+        if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
+          notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
+            storedException);
+        } else {
+          notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
+            storedException);
+        }
+      }
+    } else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) {
+      ApplicationState appState =
+          ((RMStateStoreRemoveAppEvent) event).getAppState();
+      ApplicationId appId = appState.getAppId();
+      Exception removedException = null;
+      LOG.info("Removing info for app: " + appId);
+      try {
+        removeApplicationState(appState);
+      } catch (Exception e) {
+        LOG.error("Error removing app: " + appId, e);
+        removedException = e;
+      } finally {
+        notifyDoneRemovingApplcation(appId, removedException);
+      }
+    } else {
+      LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
     }
   }
 
   @SuppressWarnings("unchecked")
   /**
    * In (@link handleStoreEvent}, this method is called to notify the
-   * application about operation completion
+   * application that new application is stored in state store
    * @param appId id of the application that has been saved
    * @param storedException the exception that is thrown when storing the
    * application
@@ -520,19 +638,33 @@ public abstract class RMStateStore exten
   private void notifyDoneStoringApplication(ApplicationId appId,
                                                   Exception storedException) {
     rmDispatcher.getEventHandler().handle(
-        new RMAppStoredEvent(appId, storedException));
+        new RMAppNewSavedEvent(appId, storedException));
   }
-  
+
+  @SuppressWarnings("unchecked")
+  private void notifyDoneUpdatingApplication(ApplicationId appId,
+      Exception storedException) {
+    rmDispatcher.getEventHandler().handle(
+      new RMAppUpdateSavedEvent(appId, storedException));
+  }
+
   @SuppressWarnings("unchecked")
   /**
    * In (@link handleStoreEvent}, this method is called to notify the
-   * application attempt about operation completion
+   * application attempt that new attempt is stored in state store
    * @param appAttempt attempt that has been saved
    */
   private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
                                                   Exception storedException) {
     rmDispatcher.getEventHandler().handle(
-        new RMAppAttemptStoredEvent(attemptId, storedException));
+        new RMAppAttemptNewSavedEvent(attemptId, storedException));
+  }
+
+  @SuppressWarnings("unchecked")
+  private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId,
+      Exception updatedException) {
+    rmDispatcher.getEventHandler().handle(
+      new RMAppAttemptUpdateSavedEvent(attemptId, updatedException));
   }
 
   @SuppressWarnings("unchecked")

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java Sun Nov 10 20:09:09 2013
@@ -21,5 +21,7 @@ package org.apache.hadoop.yarn.server.re
 public enum RMStateStoreEventType {
   STORE_APP_ATTEMPT,
   STORE_APP,
+  UPDATE_APP,
+  UPDATE_APP_ATTEMPT,
   REMOVE_APP
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Sun Nov 10 20:09:09 2013
@@ -18,7 +18,14 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -27,6 +34,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ZKUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -37,9 +46,6 @@ import org.apache.hadoop.yarn.security.c
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ZKUtil;
-
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
@@ -51,13 +57,7 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
 
 @Private
 @Unstable
@@ -224,8 +224,11 @@ public class ZKRMStateStore extends RMSt
                 ApplicationStateDataProto.parseFrom(childData));
         ApplicationState appState =
             new ApplicationState(appStateData.getSubmitTime(),
-                appStateData.getApplicationSubmissionContext(),
-                appStateData.getUser());
+              appStateData.getStartTime(),
+              appStateData.getApplicationSubmissionContext(),
+              appStateData.getUser(),
+              appStateData.getState(),
+              appStateData.getDiagnostics(), appStateData.getFinishTime());
         if (!appId.equals(appState.context.getApplicationId())) {
           throw new YarnRuntimeException("The child node name is different " +
               "from the application id");
@@ -249,7 +252,12 @@ public class ZKRMStateStore extends RMSt
         }
         ApplicationAttemptState attemptState =
             new ApplicationAttemptState(attemptId,
-                attemptStateData.getMasterContainer(), credentials);
+              attemptStateData.getMasterContainer(), credentials,
+              attemptStateData.getStartTime(),
+              attemptStateData.getState(),
+              attemptStateData.getFinalTrackingUrl(),
+              attemptStateData.getDiagnostics(),
+              attemptStateData.getFinalApplicationStatus());
         if (!attemptId.equals(attemptState.getAttemptId())) {
           throw new YarnRuntimeException("The child node name is different " +
               "from the application attempt id");
@@ -280,21 +288,34 @@ public class ZKRMStateStore extends RMSt
   }
 
   @Override
-  public synchronized void storeApplicationState(
-      String appId, ApplicationStateDataPBImpl appStateDataPB) throws
-      Exception {
+  public synchronized void storeApplicationStateInternal(String appId,
+      ApplicationStateDataPBImpl appStateDataPB) throws Exception {
     String nodeCreatePath = getNodePath(rmAppRoot, appId);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
     }
     byte[] appStateData = appStateDataPB.getProto().toByteArray();
-    createWithRetries(
-        nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT);
+    createWithRetries(nodeCreatePath, appStateData, zkAcl,
+      CreateMode.PERSISTENT);
+
+  }
+
+  @Override
+  public synchronized void updateApplicationStateInternal(String appId,
+      ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+    String nodeCreatePath = getNodePath(rmAppRoot, appId);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing final state info for app: " + appId + " at: "
+          + nodeCreatePath);
+    }
+    byte[] appStateData = appStateDataPB.getProto().toByteArray();
+    setDataWithRetries(nodeCreatePath, appStateData, 0);
   }
 
   @Override
-  public synchronized void storeApplicationAttemptState(
+  public synchronized void storeApplicationAttemptStateInternal(
       String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
       throws Exception {
     String nodeCreatePath = getNodePath(rmAppRoot, attemptId);
@@ -304,7 +325,20 @@ public class ZKRMStateStore extends RMSt
     }
     byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
     createWithRetries(nodeCreatePath, attemptStateData, zkAcl,
-        CreateMode.PERSISTENT);
+      CreateMode.PERSISTENT);
+  }
+
+  @Override
+  public synchronized void updateApplicationAttemptStateInternal(
+      String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      throws Exception {
+    String nodeCreatePath = getNodePath(rmAppRoot, attemptId);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing final state info for attempt: " + attemptId + " at: "
+          + nodeCreatePath);
+    }
+    byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
+    setDataWithRetries(nodeCreatePath, attemptStateData, 0);
   }
 
   @Override

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java Sun Nov 10 20:09:09 2013
@@ -24,6 +24,8 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 
 /*
  * Contains the state data that needs to be persisted for an ApplicationAttempt
@@ -61,4 +63,50 @@ public interface ApplicationAttemptState
   public ByteBuffer getAppAttemptTokens();
 
   public void setAppAttemptTokens(ByteBuffer attemptTokens);
+
+  /**
+   * Get the final state of the application attempt.
+   * @return the final state of the application attempt.
+   */
+  public RMAppAttemptState getState();
+
+  public void setState(RMAppAttemptState state);
+
+  /**
+   * Get the original not-proxied <em>final tracking url</em> for the
+   * application. This is intended to only be used by the proxy itself.
+   * 
+   * @return the original not-proxied <em>final tracking url</em> for the
+   *         application
+   */
+  public String getFinalTrackingUrl();
+
+  /**
+   * Set the final tracking Url of the AM.
+   * @param url
+   */
+  public void setFinalTrackingUrl(String url);
+  /**
+   * Get the <em>diagnositic information</em> of the attempt 
+   * @return <em>diagnositic information</em> of the attempt
+   */
+  public String getDiagnostics();
+
+  public void setDiagnostics(String diagnostics);
+
+  /**
+   * Get the <em>start time</em> of the application.
+   * @return <em>start time</em> of the application
+   */
+  public long getStartTime();
+
+  public void setStartTime(long startTime);
+
+  /**
+   * Get the <em>final finish status</em> of the application.
+   * @return <em>final finish status</em> of the application
+   */
+  public FinalApplicationStatus getFinalApplicationStatus();
+
+  public void setFinalApplicationStatus(FinalApplicationStatus finishState);
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java Sun Nov 10 20:09:09 2013
@@ -18,10 +18,13 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 
 /**
  * Contains all the state data that needs to be stored persistently 
@@ -42,7 +45,19 @@ public interface ApplicationStateData {
   @Public
   @Unstable
   public void setSubmitTime(long submitTime);
-  
+
+  /**
+   * Get the <em>start time</em> of the application.
+   * @return <em>start time</em> of the application
+   */
+  @Public
+  @Stable
+  public abstract long getStartTime();
+
+  @Private
+  @Unstable
+  public abstract void setStartTime(long startTime);
+
   /**
    * The application submitter
    */
@@ -66,6 +81,29 @@ public interface ApplicationStateData {
   @Public
   @Unstable
   public void setApplicationSubmissionContext(
-                                          ApplicationSubmissionContext context);
+      ApplicationSubmissionContext context);
+
+  /**
+   * Get the final state of the application.
+   * @return the final state of the application.
+   */
+  public RMAppState getState();
+
+  public void setState(RMAppState state);
+
+  /**
+   * Get the diagnostics information for the application master.
+   * @return the diagnostics information for the application master.
+   */
+  public String getDiagnostics();
+
+  public void setDiagnostics(String diagnostics);
+
+  /**
+   * The finish time of the application.
+   * @return the finish time of the application.,
+   */
+  public long getFinishTime();
 
+  public void setFinishTime(long finishTime);
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java Sun Nov 10 20:09:09 2013
@@ -22,14 +22,19 @@ import java.nio.ByteBuffer;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppAttemptStateProto;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 
 public class ApplicationAttemptStateDataPBImpl
 extends ProtoBase<ApplicationAttemptStateDataProto> 
@@ -156,14 +161,125 @@ implements ApplicationAttemptStateData {
     this.appAttemptTokens = attemptTokens;
   }
 
+  @Override
+  public RMAppAttemptState getState() {
+    ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasAppAttemptState()) {
+      return null;
+    }
+    return convertFromProtoFormat(p.getAppAttemptState());
+  }
+
+  @Override
+  public void setState(RMAppAttemptState state) {
+    maybeInitBuilder();
+    if (state == null) {
+      builder.clearAppAttemptState();
+      return;
+    }
+    builder.setAppAttemptState(convertToProtoFormat(state));
+  }
+
+  @Override
+  public String getFinalTrackingUrl() {
+    ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasFinalTrackingUrl()) {
+      return null;
+    }
+    return p.getFinalTrackingUrl();
+  }
+
+  @Override
+  public void setFinalTrackingUrl(String url) {
+    maybeInitBuilder();
+    if (url == null) {
+      builder.clearFinalTrackingUrl();
+      return;
+    }
+    builder.setFinalTrackingUrl(url);
+  }
+
+  @Override
+  public String getDiagnostics() {
+    ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasDiagnostics()) {
+      return null;
+    }
+    return p.getDiagnostics();
+  }
+
+  @Override
+  public void setDiagnostics(String diagnostics) {
+    maybeInitBuilder();
+    if (diagnostics == null) {
+      builder.clearDiagnostics();
+      return;
+    }
+    builder.setDiagnostics(diagnostics);
+  }
+
+  @Override
+  public long getStartTime() {
+    ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getStartTime();
+  }
+
+  @Override
+  public void setStartTime(long startTime) {
+    maybeInitBuilder();
+    builder.setStartTime(startTime);
+  }
+
+  @Override
+  public FinalApplicationStatus getFinalApplicationStatus() {
+    ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasFinalApplicationStatus()) {
+      return null;
+    }
+    return convertFromProtoFormat(p.getFinalApplicationStatus());
+  }
+
+  @Override
+  public void setFinalApplicationStatus(FinalApplicationStatus finishState) {
+    maybeInitBuilder();
+    if (finishState == null) {
+      builder.clearFinalApplicationStatus();
+      return;
+    }
+    builder.setFinalApplicationStatus(convertToProtoFormat(finishState));
+  }
+
   public static ApplicationAttemptStateData newApplicationAttemptStateData(
       ApplicationAttemptId attemptId, Container container,
-      ByteBuffer attemptTokens) {
+      ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
+      String finalTrackingUrl, String diagnostics,
+      FinalApplicationStatus amUnregisteredFinalStatus) {
     ApplicationAttemptStateData attemptStateData =
         recordFactory.newRecordInstance(ApplicationAttemptStateData.class);
     attemptStateData.setAttemptId(attemptId);
     attemptStateData.setMasterContainer(container);
     attemptStateData.setAppAttemptTokens(attemptTokens);
+    attemptStateData.setState(finalState);
+    attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
+    attemptStateData.setDiagnostics(diagnostics);
+    attemptStateData.setStartTime(startTime);
+    attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
     return attemptStateData;
   }
+
+  private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_";
+  public static RMAppAttemptStateProto convertToProtoFormat(RMAppAttemptState e) {
+    return RMAppAttemptStateProto.valueOf(RM_APP_ATTEMPT_PREFIX + e.name());
+  }
+  public static RMAppAttemptState convertFromProtoFormat(RMAppAttemptStateProto e) {
+    return RMAppAttemptState.valueOf(e.name().replace(RM_APP_ATTEMPT_PREFIX, ""));
+  }
+
+  private FinalApplicationStatusProto convertToProtoFormat(FinalApplicationStatus s) {
+    return ProtoUtils.convertToProtoFormat(s);
+  }
+  private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) {
+    return ProtoUtils.convertFromProtoFormat(s);
+  }
+
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java Sun Nov 10 20:09:09 2013
@@ -21,14 +21,20 @@ package org.apache.hadoop.yarn.server.re
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppStateProto;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 
 public class ApplicationStateDataPBImpl 
 extends ProtoBase<ApplicationStateDataProto> 
 implements ApplicationStateData {
-  
+  private static final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
   ApplicationStateDataProto proto = 
             ApplicationStateDataProto.getDefaultInstance();
   ApplicationStateDataProto.Builder builder = null;
@@ -92,6 +98,18 @@ implements ApplicationStateData {
   }
 
   @Override
+  public long getStartTime() {
+    ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getStartTime();
+  }
+
+  @Override
+  public void setStartTime(long startTime) {
+    maybeInitBuilder();
+    builder.setStartTime(startTime);
+  }
+
+  @Override
   public String getUser() {
     ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
     if (!p.hasUser()) {
@@ -132,4 +150,78 @@ implements ApplicationStateData {
     this.applicationSubmissionContext = context;
   }
 
+  @Override
+  public RMAppState getState() {
+    ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasApplicationState()) {
+      return null;
+    }
+    return convertFromProtoFormat(p.getApplicationState());
+  }
+
+  @Override
+  public void setState(RMAppState finalState) {
+    maybeInitBuilder();
+    if (finalState == null) {
+      builder.clearApplicationState();
+      return;
+    }
+    builder.setApplicationState(convertToProtoFormat(finalState));
+  }
+
+  @Override
+  public String getDiagnostics() {
+    ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasDiagnostics()) {
+      return null;
+    }
+    return p.getDiagnostics();
+  }
+
+  @Override
+  public void setDiagnostics(String diagnostics) {
+    maybeInitBuilder();
+    if (diagnostics == null) {
+      builder.clearDiagnostics();
+      return;
+    }
+    builder.setDiagnostics(diagnostics);
+  }
+
+  @Override
+  public long getFinishTime() {
+    ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getFinishTime();
+  }
+
+  @Override
+  public void setFinishTime(long finishTime) {
+    maybeInitBuilder();
+    builder.setFinishTime(finishTime);
+  }
+
+  public static ApplicationStateData newApplicationStateData(long submitTime,
+      long startTime, String user,
+      ApplicationSubmissionContext submissionContext, RMAppState state,
+      String diagnostics, long finishTime) {
+
+    ApplicationStateData appState =
+        recordFactory.newRecordInstance(ApplicationStateData.class);
+    appState.setSubmitTime(submitTime);
+    appState.setStartTime(startTime);
+    appState.setUser(user);
+    appState.setApplicationSubmissionContext(submissionContext);
+    appState.setState(state);
+    appState.setDiagnostics(diagnostics);
+    appState.setFinishTime(finishTime);
+    return appState;
+  }
+
+  private static String RM_APP_PREFIX = "RMAPP_";
+  public static RMAppStateProto convertToProtoFormat(RMAppState e) {
+    return RMAppStateProto.valueOf(RM_APP_PREFIX + e.name());
+  }
+  public static RMAppState convertFromProtoFormat(RMAppStateProto e) {
+    return RMAppState.valueOf(e.name().replace(RM_APP_PREFIX, ""));
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Sun Nov 10 20:09:09 2013
@@ -35,6 +35,7 @@ public enum RMAppEventType {
   NODE_UPDATE,
 
   // Source: RMStateStore
-  APP_SAVED,
+  APP_NEW_SAVED,
+  APP_UPDATE_SAVED,
   APP_REMOVED
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Sun Nov 10 20:09:09 2013
@@ -54,10 +54,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -103,7 +101,8 @@ public class RMAppImpl implements RMApp,
 
   // Mutable fields
   private long startTime;
-  private long finishTime;
+  private long finishTime = 0;
+  private long storedFinishTime = 0;
   private RMAppAttempt currentAttempt;
   private String queue;
   @SuppressWarnings("rawtypes")
@@ -111,8 +110,11 @@ public class RMAppImpl implements RMApp,
   private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
   private static final AppFinishedTransition FINISHED_TRANSITION =
       new AppFinishedTransition();
-  private boolean isAppRemovalRequestSent = false;
-  private RMAppState previousStateAtRemoving;
+  private RMAppState stateBeforeFinalSaving;
+  private RMAppEvent eventCausingFinalSaving;
+  private RMAppState targetedFinalState;
+  private RMAppState recoveredFinalState;
+  Object transitionTodo;
 
   private static final StateMachineFactory<RMAppImpl,
                                            RMAppState,
@@ -129,32 +131,45 @@ public class RMAppImpl implements RMApp,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
         RMAppEventType.START, new RMAppSavingTransition())
-    .addTransition(RMAppState.NEW, RMAppState.SUBMITTED,
-        RMAppEventType.RECOVER, new StartAppAttemptTransition())
-    .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
-        new AppKilledTransition())
-    .addTransition(RMAppState.NEW, RMAppState.FAILED,
-        RMAppEventType.APP_REJECTED, new AppRejectedTransition())
+    .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
+            RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED,
+            RMAppState.FINAL_SAVING),
+        RMAppEventType.RECOVER, new RMAppRecoveredTransition())
+    .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.KILL,
+        new FinalSavingTransition(
+          new AppKilledTransition(), RMAppState.KILLED))
+    .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
+        RMAppEventType.APP_REJECTED,
+        new FinalSavingTransition(
+          new AppRejectedTransition(), RMAppState.FAILED))
 
     // Transitions from NEW_SAVING state
     .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
-        RMAppEventType.APP_SAVED, new StartAppAttemptTransition())
-    .addTransition(RMAppState.NEW_SAVING, RMAppState.KILLED,
-        RMAppEventType.KILL, new AppKilledTransition())
-    .addTransition(RMAppState.NEW_SAVING, RMAppState.FAILED,
-        RMAppEventType.APP_REJECTED, new AppRejectedTransition())
+        RMAppEventType.APP_NEW_SAVED, new StartAppAttemptTransition())
+    .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
+        RMAppEventType.KILL,
+        new FinalSavingTransition(
+          new AppKilledTransition(), RMAppState.KILLED))
+    .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
+        RMAppEventType.APP_REJECTED,
+          new FinalSavingTransition(new AppRejectedTransition(),
+            RMAppState.FAILED))
 
      // Transitions from SUBMITTED state
     .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
-    .addTransition(RMAppState.SUBMITTED, RMAppState.FAILED,
-        RMAppEventType.APP_REJECTED, new AppRejectedTransition())
+    .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
+        RMAppEventType.APP_REJECTED,
+        new FinalSavingTransition(
+          new AppRejectedTransition(), RMAppState.FAILED))
     .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
         RMAppEventType.APP_ACCEPTED)
-    .addTransition(RMAppState.SUBMITTED, RMAppState.KILLED,
-        RMAppEventType.KILL, new KillAppAndAttemptTransition())
+    .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
+        RMAppEventType.KILL,
+        new FinalSavingTransition(
+          new KillAppAndAttemptTransition(), RMAppState.KILLED))
 
      // Transitions from ACCEPTED state
     .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
@@ -162,37 +177,45 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
         RMAppEventType.ATTEMPT_REGISTERED)
     .addTransition(RMAppState.ACCEPTED,
-        EnumSet.of(RMAppState.SUBMITTED, RMAppState.FAILED),
+        EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
         RMAppEventType.ATTEMPT_FAILED,
         new AttemptFailedTransition(RMAppState.SUBMITTED))
-    .addTransition(RMAppState.ACCEPTED, RMAppState.KILLED,
-        RMAppEventType.KILL, new KillAppAndAttemptTransition())
+    .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
+        RMAppEventType.KILL,
+        new FinalSavingTransition(
+          new KillAppAndAttemptTransition(), RMAppState.KILLED))
 
      // Transitions from RUNNING state
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
-    .addTransition(RMAppState.RUNNING, RMAppState.REMOVING,
-          RMAppEventType.ATTEMPT_UNREGISTERED,
-        new RMAppRemovingTransition())
+    .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
+        RMAppEventType.ATTEMPT_UNREGISTERED,
+        new FinalSavingTransition(
+          new AttemptUnregisteredTransition(),
+          RMAppState.FINISHING, RMAppState.FINISHED))
     .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
+      // UnManagedAM directly jumps to finished
         RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
     .addTransition(RMAppState.RUNNING,
-        EnumSet.of(RMAppState.SUBMITTED, RMAppState.FAILED),
+        EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
         RMAppEventType.ATTEMPT_FAILED,
         new AttemptFailedTransition(RMAppState.SUBMITTED))
-    .addTransition(RMAppState.RUNNING, RMAppState.KILLED,
-        RMAppEventType.KILL, new KillAppAndAttemptTransition())
-
-     // Transitions from REMOVING state
-    .addTransition(RMAppState.REMOVING, RMAppState.FINISHING,
-        RMAppEventType.APP_REMOVED,  new RMAppFinishingTransition())
-    .addTransition(RMAppState.REMOVING, RMAppState.FINISHED,
-        RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
-    .addTransition(RMAppState.REMOVING, RMAppState.KILLED,
-        RMAppEventType.KILL, new KillAppAndAttemptTransition())
+    .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
+        RMAppEventType.KILL,
+        new FinalSavingTransition(
+          new KillAppAndAttemptTransition(), RMAppState.KILLED))
+
+     // Transitions from FINAL_SAVING state
+    .addTransition(RMAppState.FINAL_SAVING,
+      EnumSet.of(RMAppState.FINISHING, RMAppState.FAILED,
+        RMAppState.KILLED, RMAppState.FINISHED), RMAppEventType.APP_UPDATE_SAVED,
+        new FinalStateSavedTransition())
+    .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
+        RMAppEventType.ATTEMPT_FINISHED,
+        new AttemptFinishedAtFinalSavingTransition())
     // ignorable transitions
-    .addTransition(RMAppState.REMOVING, RMAppState.REMOVING,
-        RMAppEventType.NODE_UPDATE)
+    .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
+        EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL))
 
      // Transitions from FINISHING state
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
@@ -201,7 +224,7 @@ public class RMAppImpl implements RMApp,
         RMAppEventType.KILL, new KillAppAndAttemptTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
-      EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.APP_REMOVED))
+      EnumSet.of(RMAppEventType.NODE_UPDATE))
 
      // Transitions from FINISHED state
      // ignorable transitions
@@ -210,14 +233,12 @@ public class RMAppImpl implements RMApp,
             RMAppEventType.NODE_UPDATE,
             RMAppEventType.ATTEMPT_UNREGISTERED,
             RMAppEventType.ATTEMPT_FINISHED,
-            RMAppEventType.KILL,
-            RMAppEventType.APP_REMOVED))
+            RMAppEventType.KILL))
 
      // Transitions from FAILED state
      // ignorable transitions
     .addTransition(RMAppState.FAILED, RMAppState.FAILED,
-        EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE,
-          RMAppEventType.APP_SAVED, RMAppEventType.APP_REMOVED))
+        EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))
 
      // Transitions from KILLED state
      // ignorable transitions
@@ -227,8 +248,7 @@ public class RMAppImpl implements RMApp,
         EnumSet.of(RMAppEventType.APP_ACCEPTED,
             RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
             RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
-            RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE,
-            RMAppEventType.APP_SAVED, RMAppEventType.APP_REMOVED))
+            RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE))
 
      .installTopology();
 
@@ -316,9 +336,8 @@ public class RMAppImpl implements RMApp,
   @Override
   public RMAppState getState() {
     this.readLock.lock();
-
     try {
-      return this.stateMachine.getCurrentState();
+        return this.stateMachine.getCurrentState();
     } finally {
       this.readLock.unlock();
     }
@@ -398,7 +417,7 @@ public class RMAppImpl implements RMApp,
     case SUBMITTED:
     case ACCEPTED:
     case RUNNING:
-    case REMOVING:
+    case FINAL_SAVING:
       return FinalApplicationStatus.UNDEFINED;    
     // finished without a proper final state is the same as failed  
     case FINISHING:
@@ -586,8 +605,12 @@ public class RMAppImpl implements RMApp,
   @Override
   public void recover(RMState state) throws Exception{
     ApplicationState appState = state.getApplicationState().get(getApplicationId());
+    this.recoveredFinalState = appState.getState();
     LOG.info("Recovering app: " + getApplicationId() + " with " + 
-            + appState.getAttemptCount() + " attempts");
+        + appState.getAttemptCount() + " attempts and final state = " + this.recoveredFinalState );
+    this.diagnostics.append(appState.getDiagnostics());
+    this.storedFinishTime = appState.getFinishTime();
+    this.startTime = appState.getStartTime();
     for(int i=0; i<appState.getAttemptCount(); ++i) {
       // create attempt
       createNewAttempt(false);
@@ -632,60 +655,195 @@ public class RMAppImpl implements RMApp,
           nodeUpdateEvent.getNode());
     };
   }
-  
+
+  private static final class RMAppRecoveredTransition implements
+      MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
+    @Override
+    public RMAppState transition(RMAppImpl app, RMAppEvent event) {
+
+      if (app.recoveredFinalState != null) {
+        FINAL_TRANSITION.transition(app, event);
+        return app.recoveredFinalState;
+      }
+      // Directly call AttemptFailedTransition, since now we deem that an
+      // application fails because of RM restart as a normal AM failure.
+
+      // Do not recover unmanaged applications since current recovery 
+      // mechanism of restarting attempts does not work for them.
+      // This will need to be changed in work preserving recovery in which 
+      // RM will re-connect with the running AM's instead of restarting them
+
+      // In work-preserve restart, if attemptCount == maxAttempts, the job still
+      // needs to be recovered because the last attempt may still be running.
+
+      // As part of YARN-1210, we may return ACCECPTED state waiting for AM to
+      // reregister or fail and remove the following code.
+      return new AttemptFailedTransition(RMAppState.SUBMITTED).transition(app,
+        event);
+    }
+  }
+
   private static final class StartAppAttemptTransition extends RMAppTransition {
+    @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      if (event.getType().equals(RMAppEventType.APP_SAVED)) {
-        assert app.getState().equals(RMAppState.NEW_SAVING);
-        RMAppStoredEvent storeEvent = (RMAppStoredEvent) event;
-        if(storeEvent.getStoredException() != null) {
-          // For HA this exception needs to be handled by giving up
-          // master status if we got fenced
-          LOG.error("Failed to store application: "
-              + storeEvent.getApplicationId(),
-              storeEvent.getStoredException());
-          ExitUtil.terminate(1, storeEvent.getStoredException());
-        }
+      RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event;
+      if (storeEvent.getStoredException() != null) {
+        // For HA this exception needs to be handled by giving up
+        // master status if we got fenced
+        LOG.error(
+          "Failed to store application: " + storeEvent.getApplicationId(),
+          storeEvent.getStoredException());
+        ExitUtil.terminate(1, storeEvent.getStoredException());
       }
-
       app.createNewAttempt(true);
     };
   }
 
-  private static final class RMAppFinishingTransition extends RMAppTransition {
+  private static final class FinalStateSavedTransition implements
+      MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public RMAppState transition(RMAppImpl app, RMAppEvent event) {
+      RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event;
+      if (storeEvent.getUpdatedException() != null) {
+        LOG.error("Failed to update the final state of application"
+              + storeEvent.getApplicationId(), storeEvent.getUpdatedException());
+        ExitUtil.terminate(1, storeEvent.getUpdatedException());
+      }
+
+      if (app.transitionTodo instanceof SingleArcTransition) {
+        ((SingleArcTransition) app.transitionTodo).transition(app,
+          app.eventCausingFinalSaving);
+      } else if (app.transitionTodo instanceof MultipleArcTransition) {
+        ((MultipleArcTransition) app.transitionTodo).transition(app,
+          app.eventCausingFinalSaving);
+      }
+      return app.targetedFinalState;
+
+    }
+  }
+
+  private static class AttemptFailedFinalStateSavedTransition extends
+      RMAppTransition {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      if (event.getType().equals(RMAppEventType.APP_REMOVED)) {
-        RMAppRemovedEvent removeEvent = (RMAppRemovedEvent) event;
-        if (removeEvent.getRemovedException() != null) {
-          LOG.error(
-            "Failed to remove application: " + removeEvent.getApplicationId(),
-            removeEvent.getRemovedException());
-          ExitUtil.terminate(1, removeEvent.getRemovedException());
-        }
+      String msg = null;
+      if (event instanceof RMAppFailedAttemptEvent) {
+        msg = app.getAppAttemptFailedDiagnostics(event);
       }
-      app.finishTime = System.currentTimeMillis();
+      LOG.info(msg);
+      app.diagnostics.append(msg);
+      // Inform the node for app-finish
+      FINAL_TRANSITION.transition(app, event);
+    }
+  }
+
+  private String getAppAttemptFailedDiagnostics(RMAppEvent event) {
+    String msg = null;
+    RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
+    if (this.submissionContext.getUnmanagedAM()) {
+      // RM does not manage the AM. Do not retry
+      msg = "Unmanaged application " + this.getApplicationId()
+              + " failed due to " + failedEvent.getDiagnostics()
+              + ". Failing the application.";
+    } else if (this.attempts.size() >= this.maxAppAttempts) {
+      msg = "Application " + this.getApplicationId() + " failed "
+              + this.maxAppAttempts + " times due to "
+              + failedEvent.getDiagnostics() + ". Failing the application.";
     }
+    return msg;
   }
 
   private static final class RMAppSavingTransition extends RMAppTransition {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
+
       // If recovery is enabled then store the application information in a
       // non-blocking call so make sure that RM has stored the information
       // needed to restart the AM after RM restart without further client
       // communication
       LOG.info("Storing application with id " + app.applicationId);
-      app.rmContext.getStateStore().storeApplication(app);
+      app.rmContext.getStateStore().storeNewApplication(app);
     }
   }
 
-  private static final class RMAppRemovingTransition extends RMAppTransition {
+  private void rememberTargetTransitions(RMAppEvent event,
+      Object transitionToDo, RMAppState targetFinalState) {
+    transitionTodo = transitionToDo;
+    targetedFinalState = targetFinalState;
+    eventCausingFinalSaving = event;
+  }
+
+  private void rememberTargetTransitionsAndStoreState(RMAppEvent event,
+      Object transitionToDo, RMAppState targetFinalState,
+      RMAppState stateToBeStored) {
+    rememberTargetTransitions(event, transitionToDo, targetFinalState);
+    this.stateBeforeFinalSaving = getState();
+    this.storedFinishTime = System.currentTimeMillis();
+
+    LOG.info("Updating application " + this.applicationId
+        + " with final state: " + this.targetedFinalState);
+    // we lost attempt_finished diagnostics in app, because attempt_finished
+    // diagnostics is sent after app final state is saved. Later on, we will
+    // create GetApplicationAttemptReport specifically for getting per attempt
+    // info.
+    String diags = null;
+    switch (event.getType()) {
+    case APP_REJECTED:
+      RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent) event;
+      diags = rejectedEvent.getMessage();
+      break;
+    case ATTEMPT_FINISHED:
+      RMAppFinishedAttemptEvent finishedEvent =
+          (RMAppFinishedAttemptEvent) event;
+      diags = finishedEvent.getDiagnostics();
+      break;
+    case ATTEMPT_FAILED:
+      RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
+      diags = getAppAttemptFailedDiagnostics(failedEvent);
+      break;
+    case KILL:
+      diags = getAppKilledDiagnostics();
+      break;
+    default:
+      break;
+    }
+    ApplicationState appState =
+        new ApplicationState(this.submitTime, this.startTime,
+          this.submissionContext, this.user, stateToBeStored, diags,
+          this.storedFinishTime);
+    this.rmContext.getStateStore().updateApplicationState(appState);
+  }
+
+  private static final class FinalSavingTransition extends RMAppTransition {
+    Object transitionToDo;
+    RMAppState targetedFinalState;
+    RMAppState stateToBeStored;
+
+    public FinalSavingTransition(Object transitionToDo,
+        RMAppState targetedFinalState) {
+      this(transitionToDo, targetedFinalState, targetedFinalState);
+    }
+
+    public FinalSavingTransition(Object transitionToDo,
+        RMAppState targetedFinalState, RMAppState stateToBeStored) {
+      this.transitionToDo = transitionToDo;
+      this.targetedFinalState = targetedFinalState;
+      this.stateToBeStored = stateToBeStored;
+    }
+
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      LOG.info("Removing application with id " + app.applicationId);
-      app.removeApplicationState();
-      app.previousStateAtRemoving = app.getState();
+      app.rememberTargetTransitionsAndStoreState(event, transitionToDo,
+        targetedFinalState, stateToBeStored);
+    }
+  }
+
+  private static class AttemptUnregisteredTransition extends RMAppTransition {
+    @Override
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      app.finishTime = app.storedFinishTime;
     }
   }
 
@@ -698,6 +856,40 @@ public class RMAppImpl implements RMApp,
     };
   }
 
+  private static class AttemptFinishedAtFinalSavingTransition extends
+      RMAppTransition {
+    @Override
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      if (app.targetedFinalState.equals(RMAppState.FAILED)
+          || app.targetedFinalState.equals(RMAppState.KILLED)) {
+        // Ignore Attempt_Finished event if we were supposed to reach FAILED
+        // FINISHED state
+        return;
+      }
+
+      // pass in the earlier attempt_unregistered event, as it is needed in
+      // AppFinishedFinalStateSavedTransition later on
+      app.rememberTargetTransitions(event,
+        new AppFinishedFinalStateSavedTransition(app.eventCausingFinalSaving),
+        RMAppState.FINISHED);
+    };
+  }
+
+  private static class AppFinishedFinalStateSavedTransition extends
+      RMAppTransition {
+    RMAppEvent attemptUnregistered;
+
+    public AppFinishedFinalStateSavedTransition(RMAppEvent attemptUnregistered) {
+      this.attemptUnregistered = attemptUnregistered;
+    }
+    @Override
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      new AttemptUnregisteredTransition().transition(app, attemptUnregistered);
+      FINISHED_TRANSITION.transition(app, event);
+    };
+  }
+
+
   private static class AppKilledTransition extends FinalTransition {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
@@ -706,6 +898,10 @@ public class RMAppImpl implements RMApp,
     };
   }
 
+  private static String getAppKilledDiagnostics() {
+    return "Application killed by user.";
+  }
+
   private static class KillAppAndAttemptTransition extends AppKilledTransition {
     @SuppressWarnings("unchecked")
     @Override
@@ -741,12 +937,10 @@ public class RMAppImpl implements RMApp,
         app.handler.handle(
             new RMNodeCleanAppEvent(nodeId, app.applicationId));
       }
-      if (app.getState() != RMAppState.FINISHING) {
+      app.finishTime = app.storedFinishTime;
+      if (app.finishTime == 0 ) {
         app.finishTime = System.currentTimeMillis();
       }
-      // application completely done and remove from state store.
-      app.removeApplicationState();
-
       app.handler.handle(
           new RMAppManagerEvent(app.applicationId,
           RMAppManagerEventType.APP_COMPLETED));
@@ -764,32 +958,15 @@ public class RMAppImpl implements RMApp,
 
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
-
-      RMAppFailedAttemptEvent failedEvent = ((RMAppFailedAttemptEvent) event);
-      boolean retryApp = true;
-      String msg = null;
-      if (app.submissionContext.getUnmanagedAM()) {
-        // RM does not manage the AM. Do not retry
-        retryApp = false;
-        msg = "Unmanaged application " + app.getApplicationId()
-            + " failed due to " + failedEvent.getDiagnostics()
-            + ". Failing the application.";
-      } else if (app.attempts.size() >= app.maxAppAttempts) {
-        retryApp = false;
-        msg = "Application " + app.getApplicationId() + " failed "
-            + app.maxAppAttempts + " times due to " + failedEvent.getDiagnostics()
-            + ". Failing the application.";
-      }
-
-      if (retryApp) {
+      if (!app.submissionContext.getUnmanagedAM()
+          && app.attempts.size() < app.maxAppAttempts) {
         app.createNewAttempt(true);
         return initialState;
       } else {
-        LOG.info(msg);
-        app.diagnostics.append(msg);
-        // Inform the node for app-finish
-        FINAL_TRANSITION.transition(app, event);
-        return RMAppState.FAILED;
+        app.rememberTargetTransitionsAndStoreState(event,
+          new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED,
+          RMAppState.FAILED);
+        return RMAppState.FINAL_SAVING;
       }
     }
 
@@ -814,9 +991,9 @@ public class RMAppImpl implements RMApp,
   @Override
   public YarnApplicationState createApplicationState() {
     RMAppState rmAppState = getState();
-    // If App is in REMOVING state, return its previous state.
-    if (rmAppState.equals(RMAppState.REMOVING)) {
-      rmAppState = previousStateAtRemoving;
+    // If App is in FINAL_SAVING state, return its previous state.
+    if (rmAppState.equals(RMAppState.FINAL_SAVING)) {
+      rmAppState = stateBeforeFinalSaving;
     }
     switch (rmAppState) {
     case NEW:
@@ -840,11 +1017,4 @@ public class RMAppImpl implements RMApp,
       throw new YarnRuntimeException("Unknown state passed!");
     }
   }
-
-  private void removeApplicationState(){
-    if (!isAppRemovalRequestSent) {
-      rmContext.getStateStore().removeApplication(this);
-      isAppRemovalRequestSent = true;
-    }
-  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java Sun Nov 10 20:09:09 2013
@@ -24,7 +24,7 @@ public enum RMAppState {
   SUBMITTED,
   ACCEPTED,
   RUNNING,
-  REMOVING,
+  FINAL_SAVING,
   FINISHING,
   FINISHED,
   FAILED,

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java Sun Nov 10 20:09:09 2013
@@ -41,7 +41,8 @@ public enum RMAppAttemptEventType {
   CONTAINER_FINISHED,
   
   // Source: RMStateStore
-  ATTEMPT_SAVED,
+  ATTEMPT_NEW_SAVED,
+  ATTEMPT_UPDATE_SAVED,
 
   // Source: Scheduler
   APP_REJECTED,



Mime
View raw message