hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1551444 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-...
Date Tue, 17 Dec 2013 02:16:20 GMT
Author: vinodkv
Date: Tue Dec 17 02:16:20 2013
New Revision: 1551444

URL: http://svn.apache.org/r1551444
Log:
YARN-1446. Changed client API to retry killing application till RM acknowledges so as to account for RM crashes/failover. Contributed by Jian He.

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Dec 17 02:16:20 2013
@@ -178,6 +178,9 @@ Release 2.4.0 - UNRELEASED
     YARN-1435. Modified Distributed Shell to accept either the command or the
     custom script. (Xuan Gong via zjshen)
 
+    YARN-1446. Changed client API to retry killing application till RM
+    acknowledges so as to account for RM crashes/failover. (Jian He via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java Tue Dec 17 02:16:20 2013
@@ -26,10 +26,21 @@ import org.apache.hadoop.yarn.api.Applic
 import org.apache.hadoop.yarn.util.Records;
 
 /**
- * <p>The response sent by the <code>ResourceManager</code> to the client
- * aborting a submitted application.</p>
- *
- * <p>Currently it's empty.</p>
+ * <p>
+ * The response sent by the <code>ResourceManager</code> to the client aborting
+ * a submitted application.
+ * </p>
+ * <p>
+ * The response, includes:
+ * <ul>
+ * <li>A flag which indicates that the process of killing the application is
+ * completed or not.</li>
+ * </ul>
+ * Note: user is recommended to wait until this flag becomes true, otherwise if
+ * the <code>ResourceManager</code> crashes before the process of killing the
+ * application is completed, the <code>ResourceManager</code> may retry this
+ * application on recovery.
+ * </p>
  * 
  * @see ApplicationClientProtocol#forceKillApplication(KillApplicationRequest)
  */
@@ -38,9 +49,24 @@ import org.apache.hadoop.yarn.util.Recor
 public abstract class KillApplicationResponse {
   @Private
   @Unstable
-  public static KillApplicationResponse newInstance() {
+  public static KillApplicationResponse newInstance(boolean isKillCompleted) {
     KillApplicationResponse response =
         Records.newRecord(KillApplicationResponse.class);
+    response.setIsKillCompleted(isKillCompleted);
     return response;
   }
+
+  /**
+   * Get the flag which indicates that the process of killing application is completed or not.
+   */
+  @Public
+  @Stable
+  public abstract boolean getIsKillCompleted();
+
+  /**
+   * Set the flag which indicates that the process of killing application is completed or not.
+   */
+  @Private
+  @Unstable
+  public abstract void setIsKillCompleted(boolean isKillCompleted);
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Tue Dec 17 02:16:20 2013
@@ -27,7 +27,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.net.NetUtils;
@@ -882,15 +881,23 @@ public class YarnConfiguration extends C
   ////////////////////////////////
 
   /**
+   * Use YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS instead.
    * The interval of the yarn client's querying application state after
    * application submission. The unit is millisecond.
    */
+  @Deprecated
   public static final String YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS =
       YARN_PREFIX + "client.app-submission.poll-interval";
-  public static final long DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS =
-      1000;
 
   /**
+   * The interval that the yarn client library uses to poll the completion
+   * status of the asynchronous API of application client protocol.
+   */
+  public static final String YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS =
+      YARN_PREFIX + "client.application-client-protocol.poll-interval-ms";
+  public static final long DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS =
+      200;
+  /**
    * Max number of threads in NMClientAsync to process container management
    * events
    */

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto Tue Dec 17 02:16:20 2013
@@ -116,6 +116,7 @@ message KillApplicationRequestProto {
 }
 
 message KillApplicationResponseProto {
+  optional bool is_kill_completed = 1 [default = false];
 }
 
 message GetClusterMetricsRequestProto {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java Tue Dec 17 02:16:20 2013
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -79,7 +80,8 @@ public class YarnClientImpl extends Yarn
 
   protected ApplicationClientProtocol rmClient;
   protected InetSocketAddress rmAddress;
-  protected long statePollIntervalMillis;
+  protected long submitPollIntervalMillis;
+  private long asyncApiPollIntervalMillis;
 
   private static final String ROOT = "root";
 
@@ -92,12 +94,20 @@ public class YarnClientImpl extends Yarn
       YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
   }
 
+  @SuppressWarnings("deprecation")
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     this.rmAddress = getRmAddress(conf);
-    statePollIntervalMillis = conf.getLong(
+    asyncApiPollIntervalMillis =
+        conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
+          YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
+    submitPollIntervalMillis = asyncApiPollIntervalMillis;
+    if (conf.get(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS)
+        != null) {
+      submitPollIntervalMillis = conf.getLong(
         YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
-        YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS);
+        YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
+    }
     super.serviceInit(conf);
   }
 
@@ -165,7 +175,7 @@ public class YarnClientImpl extends Yarn
             " is still in " + state);
       }
       try {
-        Thread.sleep(statePollIntervalMillis);
+        Thread.sleep(submitPollIntervalMillis);
       } catch (InterruptedException ie) {
       }
     }
@@ -179,11 +189,29 @@ public class YarnClientImpl extends Yarn
   @Override
   public void killApplication(ApplicationId applicationId)
       throws YarnException, IOException {
-    LOG.info("Killing application " + applicationId);
     KillApplicationRequest request =
         Records.newRecord(KillApplicationRequest.class);
     request.setApplicationId(applicationId);
-    rmClient.forceKillApplication(request);
+
+    try {
+      int pollCount = 0;
+      while (true) {
+        KillApplicationResponse response =
+            rmClient.forceKillApplication(request);
+        if (response.getIsKillCompleted()) {
+          break;
+        }
+        if (++pollCount % 10 == 0) {
+          LOG.info("Watiting for application " + applicationId
+              + " to be killed.");
+        }
+        Thread.sleep(asyncApiPollIntervalMillis);
+      }
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted while waiting for application " + applicationId
+          + " to be killed.");
+    }
+    LOG.info("Killed application " + applicationId);
   }
 
   @Override

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java Tue Dec 17 02:16:20 2013
@@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -89,6 +91,7 @@ public class TestYarnClient {
     rm.stop();
   }
 
+  @SuppressWarnings("deprecation")
   @Test (timeout = 30000)
   public void testSubmitApplication() {
     Configuration conf = new Configuration();
@@ -128,6 +131,23 @@ public class TestYarnClient {
     client.stop();
   }
 
+  @Test
+  public void testKillApplication() throws Exception {
+    MockRM rm = new MockRM();
+    rm.start();
+    RMApp app = rm.submitApp(2000);
+
+    Configuration conf = new Configuration();
+    @SuppressWarnings("resource")
+    final YarnClient client = new MockYarnClient();
+    client.init(conf);
+    client.start();
+
+    client.killApplication(app.getApplicationId());
+    verify(((MockYarnClient) client).getRMClient(), times(2))
+      .forceKillApplication(any(KillApplicationRequest.class));
+  }
+
   @Test(timeout = 30000)
   public void testApplicationType() throws Exception {
     Logger rootLogger = LogManager.getRootLogger();
@@ -234,6 +254,11 @@ public class TestYarnClient {
             GetApplicationReportRequest.class))).thenReturn(mockResponse);
         when(rmClient.getApplications(any(GetApplicationsRequest.class)))
             .thenReturn(mockAppResponse);
+        // return false for 1st kill request, and true for the 2nd.
+        when(rmClient.forceKillApplication(any(
+          KillApplicationRequest.class)))
+          .thenReturn(KillApplicationResponse.newInstance(false)).thenReturn(
+            KillApplicationResponse.newInstance(true));
       } catch (YarnException e) {
         Assert.fail("Exception is not expected.");
       } catch (IOException e) {
@@ -242,6 +267,10 @@ public class TestYarnClient {
       when(mockResponse.getApplicationReport()).thenReturn(mockReport);
     }
 
+    public ApplicationClientProtocol getRMClient() {
+      return rmClient;
+    }
+
     @Override
     public List<ApplicationReport> getApplications(
         Set<String> applicationTypes, EnumSet<YarnApplicationState> applicationStates)

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java Tue Dec 17 02:16:20 2013
@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProtoOrBuilder;
 
 import com.google.protobuf.TextFormat;
 
@@ -67,4 +68,24 @@ public class KillApplicationResponsePBIm
   public String toString() {
     return TextFormat.shortDebugString(getProto());
   }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = KillApplicationResponseProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public boolean getIsKillCompleted() {
+    KillApplicationResponseProtoOrBuilder p =
+        viaProto ? proto : builder;
+    return p.getIsKillCompleted();
+  }
+
+  @Override
+  public void setIsKillCompleted(boolean isKillCompleted) {
+    maybeInitBuilder();
+    builder.setIsKillCompleted(isKillCompleted);
+  }
 }  

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Tue Dec 17 02:16:20 2013
@@ -945,10 +945,10 @@
 
   <!-- Other configuration -->
   <property>
-    <description>The interval of the yarn client's querying application state
-    after application submission. The unit is millisecond.</description>
-    <name>yarn.client.app-submission.poll-interval</name>
-    <value>1000</value>
+    <description>The interval that the yarn client library uses to poll the
+    completion status of the asynchronous API of application client protocol.
+    </description>
+    <name>yarn.client.application-client-protocol.poll-interval-ms</name>
+    <value>200</value>
   </property>
-
 </configuration>

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Tue Dec 17 02:16:20 2013
@@ -292,15 +292,15 @@ public class ApplicationMasterService ex
       
       this.amLivelinessMonitor.receivedPing(applicationAttemptId);
 
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
-              .getTrackingUrl(), request.getFinalApplicationStatus(), request
-              .getDiagnostics()));
-
       if (rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
-          .isAppSafeToUnregister()) {
+          .isAppSafeToTerminate()) {
         return FinishApplicationMasterResponse.newInstance(true);
       } else {
+        // keep sending the unregister event as RM may crash in the meanwhile.
+        rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
+              .getTrackingUrl(), request.getFinalApplicationStatus(), request
+              .getDiagnostics()));
         return FinishApplicationMasterResponse.newInstance(false);
       }
     }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Tue Dec 17 02:16:20 2013
@@ -380,14 +380,15 @@ public class ClientRMService extends Abs
           + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
     }
 
-    this.rmContext.getDispatcher().getEventHandler().handle(
-        new RMAppEvent(applicationId, RMAppEventType.KILL));
-
-    RMAuditLogger.logSuccess(callerUGI.getShortUserName(), 
-        AuditConstants.KILL_APP_REQUEST, "ClientRMService" , applicationId);
-    KillApplicationResponse response = recordFactory
-        .newRecordInstance(KillApplicationResponse.class);
-    return response;
+    if (application.isAppSafeToTerminate()) {
+      RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
+        AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId);
+      return KillApplicationResponse.newInstance(true);
+    } else {
+      this.rmContext.getDispatcher().getEventHandler()
+        .handle(new RMAppEvent(applicationId, RMAppEventType.KILL));
+      return KillApplicationResponse.newInstance(false);
+    }
   }
 
   @Override

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Tue Dec 17 02:16:20 2013
@@ -197,13 +197,13 @@ public interface RMApp extends EventHand
   String getApplicationType(); 
 
   /**
-   * Check whether this application is safe to unregister.
-   * An application is deemed to be safe to unregister if it is an unmanaged
-   * AM or its state has been removed from state store.
+   * Check whether this application is safe to terminate.
+   * An application is deemed to be safe to terminate if it is an unmanaged
+   * AM or its state has been saved in state store.
    * @return the flag which indicates whether this application is safe to
-   *         unregister.
+   *         terminate.
    */
-  boolean isAppSafeToUnregister();
+  boolean isAppSafeToTerminate();
 
   /**
    * Create the external user-facing state of ApplicationMaster from the

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Tue Dec 17 02:16:20 2013
@@ -37,5 +37,4 @@ public enum RMAppEventType {
   // Source: RMStateStore
   APP_NEW_SAVED,
   APP_UPDATE_SAVED,
-  APP_REMOVED
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Tue Dec 17 02:16:20 2013
@@ -110,10 +110,14 @@ public class RMAppImpl implements RMApp,
   private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
   private static final AppFinishedTransition FINISHED_TRANSITION =
       new AppFinishedTransition();
+
+  // These states stored are only valid when app is at killing or final_saving.
+  private RMAppState stateBeforeKilling;
   private RMAppState stateBeforeFinalSaving;
   private RMAppEvent eventCausingFinalSaving;
   private RMAppState targetedFinalState;
   private RMAppState recoveredFinalState;
+
   Object transitionTodo;
 
   private static final StateMachineFactory<RMAppImpl,
@@ -166,10 +170,8 @@ public class RMAppImpl implements RMApp,
           new AppRejectedTransition(), RMAppState.FAILED))
     .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
         RMAppEventType.APP_ACCEPTED)
-    .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
-        RMAppEventType.KILL,
-        new FinalSavingTransition(
-          new KillAppAndAttemptTransition(), RMAppState.KILLED))
+    .addTransition(RMAppState.SUBMITTED, RMAppState.KILLING,
+        RMAppEventType.KILL,new KillAttemptTransition())
 
      // Transitions from ACCEPTED state
     .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
@@ -180,10 +182,8 @@ public class RMAppImpl implements RMApp,
         EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
         RMAppEventType.ATTEMPT_FAILED,
         new AttemptFailedTransition(RMAppState.SUBMITTED))
-    .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
-        RMAppEventType.KILL,
-        new FinalSavingTransition(
-          new KillAppAndAttemptTransition(), RMAppState.KILLED))
+    .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
+        RMAppEventType.KILL,new KillAttemptTransition())
 
      // Transitions from RUNNING state
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
@@ -200,10 +200,8 @@ public class RMAppImpl implements RMApp,
         EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
         RMAppEventType.ATTEMPT_FAILED,
         new AttemptFailedTransition(RMAppState.SUBMITTED))
-    .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
-        RMAppEventType.KILL,
-        new FinalSavingTransition(
-          new KillAppAndAttemptTransition(), RMAppState.KILLED))
+    .addTransition(RMAppState.RUNNING, RMAppState.KILLING,
+        RMAppEventType.KILL, new KillAttemptTransition())
 
      // Transitions from FINAL_SAVING state
     .addTransition(RMAppState.FINAL_SAVING,
@@ -221,11 +219,27 @@ public class RMAppImpl implements RMApp,
      // Transitions from FINISHING state
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
         RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
-    .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
-        RMAppEventType.KILL, new KillAppAndAttemptTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
-      EnumSet.of(RMAppEventType.NODE_UPDATE))
+      EnumSet.of(RMAppEventType.NODE_UPDATE,
+        // ignore Kill as we have already saved the final Finished state in
+        // state store.
+        RMAppEventType.KILL))
+
+     // Transitions from KILLING state
+    .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
+        RMAppEventType.ATTEMPT_KILLED,
+        new FinalSavingTransition(
+          new AppKilledTransition(), RMAppState.KILLED))
+    .addTransition(RMAppState.KILLING, RMAppState.KILLING,
+        EnumSet.of(
+            RMAppEventType.NODE_UPDATE,
+            RMAppEventType.ATTEMPT_REGISTERED,
+            RMAppEventType.ATTEMPT_UNREGISTERED,
+            RMAppEventType.ATTEMPT_FINISHED,
+            RMAppEventType.ATTEMPT_FAILED,
+            RMAppEventType.APP_UPDATE_SAVED,
+            RMAppEventType.KILL))
 
      // Transitions from FINISHED state
      // ignorable transitions
@@ -249,7 +263,7 @@ public class RMAppImpl implements RMApp,
         EnumSet.of(RMAppEventType.APP_ACCEPTED,
             RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
             RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
-            RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE))
+            RMAppEventType.NODE_UPDATE))
 
      .installTopology();
 
@@ -419,6 +433,7 @@ public class RMAppImpl implements RMApp,
     case ACCEPTED:
     case RUNNING:
     case FINAL_SAVING:
+    case KILLING:
       return FinalApplicationStatus.UNDEFINED;    
     // finished without a proper final state is the same as failed  
     case FINISHING:
@@ -681,7 +696,7 @@ public class RMAppImpl implements RMApp,
       }
 
       // No existent attempts means the attempt associated with this app was not
-      // started or started but not yet saved。
+      // started or started but not yet saved.
       if (app.attempts.isEmpty()) {
         app.createNewAttempt(true);
         return RMAppState.SUBMITTED;
@@ -811,7 +826,7 @@ public class RMAppImpl implements RMApp,
       RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
       diags = getAppAttemptFailedDiagnostics(failedEvent);
       break;
-    case KILL:
+    case ATTEMPT_KILLED:
       diags = getAppKilledDiagnostics();
       break;
     default:
@@ -901,7 +916,7 @@ public class RMAppImpl implements RMApp,
   private static class AppKilledTransition extends FinalTransition {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      app.diagnostics.append("Application killed by user.");
+      app.diagnostics.append(getAppKilledDiagnostics());
       super.transition(app, event);
     };
   }
@@ -910,15 +925,16 @@ public class RMAppImpl implements RMApp,
     return "Application killed by user.";
   }
 
-  private static class KillAppAndAttemptTransition extends AppKilledTransition {
+  private static class KillAttemptTransition extends RMAppTransition {
     @SuppressWarnings("unchecked")
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      app.handler.handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(),
-          RMAppAttemptEventType.KILL));
-      super.transition(app, event);
+      app.stateBeforeKilling = app.getState();
+      app.handler.handle(new RMAppAttemptEvent(app.currentAttempt
+        .getAppAttemptId(), RMAppAttemptEventType.KILL));
     }
   }
+
   private static final class AppRejectedTransition extends
       FinalTransition{
     public void transition(RMAppImpl app, RMAppEvent event) {
@@ -986,7 +1002,7 @@ public class RMAppImpl implements RMApp,
   }
 
   @Override
-  public boolean isAppSafeToUnregister() {
+  public boolean isAppSafeToTerminate() {
     RMAppState state = getState();
     return state.equals(RMAppState.FINISHING)
         || state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
@@ -1003,6 +1019,9 @@ public class RMAppImpl implements RMApp,
     if (rmAppState.equals(RMAppState.FINAL_SAVING)) {
       rmAppState = stateBeforeFinalSaving;
     }
+    if (rmAppState.equals(RMAppState.KILLING)) {
+      rmAppState = stateBeforeKilling;
+    }
     switch (rmAppState) {
     case NEW:
       return YarnApplicationState.NEW;

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java Tue Dec 17 02:16:20 2013
@@ -28,5 +28,6 @@ public enum RMAppState {
   FINISHING,
   FINISHED,
   FAILED,
+  KILLING,
   KILLED
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Tue Dec 17 02:16:20 2013
@@ -361,6 +361,8 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.UNREGISTERED,
               RMAppAttemptEventType.STATUS_UPDATE,
               RMAppAttemptEventType.CONTAINER_ALLOCATED,
+            // ignore Kill as we have already saved the final Finished state in
+            // state store.
               RMAppAttemptEventType.KILL))
 
       // Transitions from FINISHED State

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Tue Dec 17 02:16:20 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.Applic
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -277,12 +278,10 @@ public class MockRM extends ResourceMana
         node.getState());
   }
 
-  public void killApp(ApplicationId appId) throws Exception {
+  public KillApplicationResponse killApp(ApplicationId appId) throws Exception {
     ApplicationClientProtocol client = getClientRMService();
-    KillApplicationRequest req = Records
-        .newRecord(KillApplicationRequest.class);
-    req.setApplicationId(appId);
-    client.forceKillApplication(req);
+    KillApplicationRequest req = KillApplicationRequest.newInstance(appId);
+    return client.forceKillApplication(req);
   }
 
   // from AMLauncher

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Tue Dec 17 02:16:20 2013
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -76,8 +77,9 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -414,10 +416,8 @@ public class TestRMRestart {
     MockRM rm2 = new MockRM(conf, memStore);
     rm2.start();
     // assert the previous AM state is loaded back on RM recovery.
-    RMApp recoveredApp =
-        rm2.getRMContext().getRMApps().get(app0.getApplicationId());
-    Assert.assertEquals(RMAppAttemptState.FAILED, recoveredApp
-      .getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState());
+
+    rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED);
     rm1.stop();
     rm2.stop();
   }
@@ -964,8 +964,8 @@ public class TestRMRestart {
     Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), 
                         attemptState.getMasterContainer().getId());
 
-    // Setting AMLivelinessMonitor interval to be 10 Secs. 
-    conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
+    // Setting AMLivelinessMonitor interval to be 3 Secs.
+    conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 3000);
     // start new RM   
     MockRM rm2 = new MockRM(conf, memStore);
     rm2.start();
@@ -1494,6 +1494,69 @@ public class TestRMRestart {
     Assert.assertTrue(rm1.getServiceState() == STATE.STOPPED);
   }
 
+  // This is to test Killing application should be able to wait until app
+  // reaches killed state and also check that attempt state is saved before app
+  // state is saved.
+  @Test
+  public void testClientRetryOnKillingApplication() throws Exception {
+    MemoryRMStateStore memStore = new TestMemoryRMStateStore();
+    memStore.init(conf);
+
+    // start RM
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    RMApp app1 =
+        rm1.submitApp(200, "name", "user", null, false, "default", 1, null,
+          "myType");
+    MockAM am1 = launchAM(app1, rm1, nm1);
+
+    KillApplicationResponse response;
+    int count = 0;
+    while (true) {
+      response = rm1.killApp(app1.getApplicationId());
+      if (response.getIsKillCompleted()) {
+        break;
+      }
+      Thread.sleep(100);
+      count++;
+    }
+    // we expect at least 2 calls for killApp as the first killApp always return
+    // false.
+    Assert.assertTrue(count >= 1);
+
+    rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.KILLED);
+    rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+    Assert.assertEquals(1, ((TestMemoryRMStateStore) memStore).updateAttempt);
+    Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp);
+  }
+
+  public class TestMemoryRMStateStore extends MemoryRMStateStore {
+    int count = 0;
+    public int updateApp = 0;
+    public int updateAttempt = 0;
+
+    @Override
+    public void updateApplicationStateInternal(String appId,
+        ApplicationStateDataPBImpl appStateData) throws Exception {
+      updateApp = ++count;
+      super.updateApplicationStateInternal(appId, appStateData);
+    }
+
+    @Override
+    public synchronized void
+        updateApplicationAttemptStateInternal(String attemptIdStr,
+            ApplicationAttemptStateDataPBImpl attemptStateData)
+            throws Exception {
+      updateAttempt = ++count;
+      super.updateApplicationAttemptStateInternal(attemptIdStr,
+        attemptStateData);
+    }
+  }
+
   public static class TestSecurityMockRM extends MockRM {
 
     public TestSecurityMockRM(Configuration conf, RMStateStore store) {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java Tue Dec 17 02:16:20 2013
@@ -145,7 +145,7 @@ public abstract class MockAsm extends Mo
     }
 
     @Override
-    public boolean isAppSafeToUnregister() {
+    public boolean isAppSafeToTerminate() {
       throw new UnsupportedOperationException("Not supported yet.");
     }
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java Tue Dec 17 02:16:20 2013
@@ -218,7 +218,7 @@ public class MockRMApp implements RMApp 
   }
 
   @Override
-  public boolean isAppSafeToUnregister() {
+  public boolean isAppSafeToTerminate() {
     return true;
   }
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1551444&r1=1551443&r2=1551444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Tue Dec 17 02:16:20 2013
@@ -301,12 +301,9 @@ public class TestRMAppTransitions {
 
   private void assertAppAndAttemptKilled(RMApp application)
       throws InterruptedException {
+    sendAttemptUpdateSavedEvent(application);
     sendAppUpdateSavedEvent(application);
     assertKilled(application);
-    // send attempt final state saved event.
-    application.getCurrentAppAttempt().handle(
-      new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt()
-        .getAppAttemptId(), null));
     Assert.assertEquals(RMAppAttemptState.KILLED, application
       .getCurrentAppAttempt().getAppAttemptState());
     assertAppFinalStateSaved(application);
@@ -329,6 +326,12 @@ public class TestRMAppTransitions {
     rmDispatcher.await();
   }
 
+  private void sendAttemptUpdateSavedEvent(RMApp application) {
+    application.getCurrentAppAttempt().handle(
+      new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt()
+        .getAppAttemptId(), null));
+  }
+
   protected RMApp testCreateAppNewSaving(
       ApplicationSubmissionContext submissionContext) throws IOException {
   RMApp application = createNewTestApp(submissionContext);
@@ -624,11 +627,12 @@ public class TestRMAppTransitions {
     rmDispatcher.await();
 
     // Ignore Attempt_Finished if we were supposed to go to Finished.
-    assertAppState(RMAppState.FINAL_SAVING, application);
+    assertAppState(RMAppState.KILLING, application);
     RMAppEvent finishEvent =
         new RMAppFinishedAttemptEvent(application.getApplicationId(), null);
     application.handle(finishEvent);
-    assertAppState(RMAppState.FINAL_SAVING, application);
+    assertAppState(RMAppState.KILLING, application);
+    sendAttemptUpdateSavedEvent(application);
     sendAppUpdateSavedEvent(application);
     assertKilled(application);
   }
@@ -686,8 +690,8 @@ public class TestRMAppTransitions {
   }
 
   @Test
-  public void testAppFinishingKill() throws IOException {
-    LOG.info("--- START: testAppFinishedFinished ---");
+  public void testAppAtFinishingIgnoreKill() throws IOException {
+    LOG.info("--- START: testAppAtFinishingIgnoreKill ---");
 
     RMApp application = testCreateAppFinishing(null);
     // FINISHING => FINISHED event RMAppEventType.KILL
@@ -695,7 +699,7 @@ public class TestRMAppTransitions {
         new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
     application.handle(event);
     rmDispatcher.await();
-    assertAppState(RMAppState.FINISHED, application);
+    assertAppState(RMAppState.FINISHING, application);
   }
 
   // While App is at FINAL_SAVING, Attempt_Finished event may come before
@@ -780,6 +784,7 @@ public class TestRMAppTransitions {
         new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
     application.handle(event);
     rmDispatcher.await();
+    sendAttemptUpdateSavedEvent(application);
     sendAppUpdateSavedEvent(application);
     assertTimesAtFinish(application);
     assertAppState(RMAppState.KILLED, application);
@@ -801,14 +806,6 @@ public class TestRMAppTransitions {
     assertTimesAtFinish(application);
     assertAppState(RMAppState.KILLED, application);
 
-    // KILLED => KILLED event RMAppEventType.ATTEMPT_KILLED
-    event = 
-        new RMAppEvent(application.getApplicationId(), 
-            RMAppEventType.ATTEMPT_KILLED);
-    application.handle(event);
-    rmDispatcher.await();
-    assertTimesAtFinish(application);
-    assertAppState(RMAppState.KILLED, application);
 
     // KILLED => KILLED event RMAppEventType.KILL
     event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);



Mime
View raw message