hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject svn commit: r1552467 [2/2] - in /hadoop/common/branches/HDFS-4685/hadoop-yarn-project: ./ hadoop-yarn/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/j...
Date Fri, 20 Dec 2013 01:01:26 GMT
Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java Fri Dec 20 01:01:18 2013
@@ -22,6 +22,8 @@ package org.apache.hadoop.yarn.server.re
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
@@ -51,13 +53,13 @@ public class NullRMStateStore extends RM
   }
 
   @Override
-  protected void storeApplicationStateInternal(String appId,
+  protected void storeApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateData) throws Exception {
     // Do nothing
   }
 
   @Override
-  protected void storeApplicationAttemptStateInternal(String attemptId,
+  protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
       ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
     // Do nothing
   }
@@ -92,13 +94,13 @@ public class NullRMStateStore extends RM
   }
 
   @Override
-  protected void updateApplicationStateInternal(String appId,
+  protected void updateApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateData) throws Exception {
     // Do nothing 
   }
 
   @Override
-  protected void updateApplicationAttemptStateInternal(String attemptId,
+  protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
       ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
   }
 

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Fri Dec 20 01:01:18 2013
@@ -387,10 +387,10 @@ public abstract class RMStateStore exten
    * Derived classes must implement this method to store the state of an 
    * application.
    */
-  protected abstract void storeApplicationStateInternal(String appId,
+  protected abstract void storeApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateData) throws Exception;
 
-  protected abstract void updateApplicationStateInternal(String appId,
+  protected abstract void updateApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateData) throws Exception;
   
   @SuppressWarnings("unchecked")
@@ -424,10 +424,12 @@ public abstract class RMStateStore exten
    * Derived classes must implement this method to store the state of an 
    * application attempt
    */
-  protected abstract void storeApplicationAttemptStateInternal(String attemptId,
+  protected abstract void storeApplicationAttemptStateInternal(
+      ApplicationAttemptId attemptId,
       ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
 
-  protected abstract void updateApplicationAttemptStateInternal(String attemptId,
+  protected abstract void updateApplicationAttemptStateInternal(
+      ApplicationAttemptId attemptId,
       ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
 
   /**
@@ -592,11 +594,11 @@ public abstract class RMStateStore exten
       LOG.info("Storing info for app: " + appId);
       try {
         if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
-          storeApplicationStateInternal(appId.toString(), appStateData);
+          storeApplicationStateInternal(appId, appStateData);
           notifyDoneStoringApplication(appId, storedException);
         } else {
           assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
-          updateApplicationStateInternal(appId.toString(), appStateData);
+          updateApplicationStateInternal(appId, appStateData);
           notifyDoneUpdatingApplication(appId, storedException);
         }
       } catch (Exception e) {
@@ -637,15 +639,15 @@ public abstract class RMStateStore exten
           LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
         }
         if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
-          storeApplicationAttemptStateInternal(attemptState.getAttemptId()
-            .toString(), attemptStateData);
+          storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
+              attemptStateData);
           notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
               storedException);
         } else {
           assert event.getType().equals(
             RMStateStoreEventType.UPDATE_APP_ATTEMPT);
-          updateApplicationAttemptStateInternal(attemptState.getAttemptId()
-            .toString(), attemptStateData);
+          updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
+              attemptStateData);
           notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
               storedException);
         }

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Fri Dec 20 01:01:18 2013
@@ -78,16 +78,51 @@ public class ZKRMStateStore extends RMSt
   protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
   protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
       .newInstance(1, 0);
+  private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
+      "RMDelegationTokensRoot";
+  private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
+      "RMDTSequentialNumber";
+  private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
+      "RMDTMasterKeysRoot";
   private int numRetries;
 
   private String zkHostPort = null;
   private int zkSessionTimeout;
   private long zkRetryInterval;
   private List<ACL> zkAcl;
+
+  /**
+   *
+   * ROOT_DIR_PATH
+   * |--- VERSION_INFO
+   * |--- RM_ZK_FENCING_LOCK
+   * |--- RM_APP_ROOT
+   * |     |----- (#ApplicationId1)
+   * |     |        |----- (#ApplicationAttemptIds)
+   * |     |
+   * |     |----- (#ApplicationId2)
+   * |     |       |----- (#ApplicationAttemptIds)
+   * |     ....
+   * |
+   * |--- RM_DT_SECRET_MANAGER_ROOT
+   *        |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
+   *        |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
+   *        |       |----- Token_1
+   *        |       |----- Token_2
+   *        |       ....
+   *        |
+   *        |----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME
+   *        |      |----- Key_1
+   *        |      |----- Key_2
+   *                ....
+   *
+   */
   private String zkRootNodePath;
-  private String rmDTSecretManagerRoot;
   private String rmAppRoot;
-  private String dtSequenceNumberPath = null;
+  private String rmDTSecretManagerRoot;
+  private String dtMasterKeysRootPath;
+  private String delegationTokensRootPath;
+  private String dtSequenceNumberPath;
 
   @VisibleForTesting
   protected String znodeWorkingPath;
@@ -178,12 +213,11 @@ public class ZKRMStateStore extends RMSt
       throw bafe;
     }
 
-    zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME;
-    rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT;
-    rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT;
+    zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
+    rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
 
     /* Initialize fencing related paths, acls, and ops */
-    fencingNodePath = zkRootNodePath + "/" + FENCING_LOCK;
+    fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
     createFencingNodePathOp = Op.create(fencingNodePath, new byte[0], zkAcl,
         CreateMode.PERSISTENT);
     deleteFencingNodePathOp = Op.delete(fencingNodePath, -1);
@@ -204,6 +238,15 @@ public class ZKRMStateStore extends RMSt
         zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl);
       }
     }
+
+    rmDTSecretManagerRoot =
+        getNodePath(zkRootNodePath, RM_DT_SECRET_MANAGER_ROOT);
+    dtMasterKeysRootPath = getNodePath(rmDTSecretManagerRoot,
+        RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME);
+    delegationTokensRootPath = getNodePath(rmDTSecretManagerRoot,
+        RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
+    dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
+        RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
   }
 
   @Override
@@ -217,8 +260,11 @@ public class ZKRMStateStore extends RMSt
     if (HAUtil.isHAEnabled(getConfig())){
       fence();
     }
-    createRootDir(rmDTSecretManagerRoot);
     createRootDir(rmAppRoot);
+    createRootDir(rmDTSecretManagerRoot);
+    createRootDir(dtMasterKeysRootPath);
+    createRootDir(delegationTokensRootPath);
+    createRootDir(dtSequenceNumberPath);
   }
 
   private void createRootDir(final String rootPath) throws Exception {
@@ -350,26 +396,69 @@ public class ZKRMStateStore extends RMSt
 
   private synchronized void loadRMDTSecretManagerState(RMState rmState)
       throws Exception {
-    List<String> childNodes =
-        getChildrenWithRetries(rmDTSecretManagerRoot, true);
+    loadRMDelegationKeyState(rmState);
+    loadRMSequentialNumberState(rmState);
+    loadRMDelegationTokenState(rmState);
+  }
 
+  private void loadRMDelegationKeyState(RMState rmState) throws Exception {
+    List<String> childNodes =
+        getChildrenWithRetries(dtMasterKeysRootPath, true);
     for (String childNodeName : childNodes) {
-      if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
-        rmState.rmSecretManagerState.dtSequenceNumber =
-            Integer.parseInt(childNodeName.split("_")[1]);
+      String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName);
+      byte[] childData = getDataWithRetries(childNodePath, true);
+
+      if (childData == null) {
+        LOG.warn("Content of " + childNodePath + " is broken.");
         continue;
       }
-      String childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
-      byte[] childData = getDataWithRetries(childNodePath, true);
 
       ByteArrayInputStream is = new ByteArrayInputStream(childData);
       DataInputStream fsIn = new DataInputStream(is);
+
       try {
         if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) {
           DelegationKey key = new DelegationKey();
           key.readFields(fsIn);
           rmState.rmSecretManagerState.masterKeyState.add(key);
-        } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
+        }
+      } finally {
+        is.close();
+      }
+    }
+  }
+
+  private void loadRMSequentialNumberState(RMState rmState) throws Exception {
+    byte[] seqData = getDataWithRetries(dtSequenceNumberPath, false);
+    if (seqData != null) {
+      ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData);
+      DataInputStream seqIn = new DataInputStream(seqIs);
+
+      try {
+        rmState.rmSecretManagerState.dtSequenceNumber = seqIn.readInt();
+      } finally {
+        seqIn.close();
+      }
+    }
+  }
+
+  private void loadRMDelegationTokenState(RMState rmState) throws Exception {
+    List<String> childNodes = zkClient.getChildren(delegationTokensRootPath, true);
+    for (String childNodeName : childNodes) {
+      String childNodePath =
+          getNodePath(delegationTokensRootPath, childNodeName);
+      byte[] childData = getDataWithRetries(childNodePath, true);
+
+      if (childData == null) {
+        LOG.warn("Content of " + childNodePath + " is broken.");
+        continue;
+      }
+
+      ByteArrayInputStream is = new ByteArrayInputStream(childData);
+      DataInputStream fsIn = new DataInputStream(is);
+
+      try {
+        if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
           RMDelegationTokenIdentifier identifier =
               new RMDelegationTokenIdentifier();
           identifier.readFields(fsIn);
@@ -385,8 +474,6 @@ public class ZKRMStateStore extends RMSt
 
   private synchronized void loadRMAppState(RMState rmState) throws Exception {
     List<String> childNodes = getChildrenWithRetries(rmAppRoot, true);
-    List<ApplicationAttemptState> attempts =
-        new ArrayList<ApplicationAttemptState>();
     for (String childNodeName : childNodes) {
       String childNodePath = getNodePath(rmAppRoot, childNodeName);
       byte[] childData = getDataWithRetries(childNodePath, true);
@@ -411,17 +498,28 @@ public class ZKRMStateStore extends RMSt
               "from the application id");
         }
         rmState.appState.put(appId, appState);
-      } else if (childNodeName
-          .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
-        // attempt
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Loading application attempt from znode: " + childNodeName);
-        }
+        loadApplicationAttemptState(appState, appId);
+      } else {
+        LOG.info("Unknown child node with name: " + childNodeName);
+      }
+    }
+  }
+
+  private void loadApplicationAttemptState(ApplicationState appState,
+      ApplicationId appId)
+      throws Exception {
+    String appPath = getNodePath(rmAppRoot, appId.toString());
+    List<String> attempts = getChildrenWithRetries(appPath, false);
+    for (String attemptIDStr : attempts) {
+      if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
+        String attemptPath = getNodePath(appPath, attemptIDStr);
+        byte[] attemptData = getDataWithRetries(attemptPath, true);
+
         ApplicationAttemptId attemptId =
-            ConverterUtils.toApplicationAttemptId(childNodeName);
+            ConverterUtils.toApplicationAttemptId(attemptIDStr);
         ApplicationAttemptStateDataPBImpl attemptStateData =
             new ApplicationAttemptStateDataPBImpl(
-                ApplicationAttemptStateDataProto.parseFrom(childData));
+                ApplicationAttemptStateDataProto.parseFrom(attemptData));
         Credentials credentials = null;
         if (attemptStateData.getAppAttemptTokens() != null) {
           credentials = new Credentials();
@@ -429,47 +527,26 @@ public class ZKRMStateStore extends RMSt
           dibb.reset(attemptStateData.getAppAttemptTokens());
           credentials.readTokenStorageStream(dibb);
         }
+
         ApplicationAttemptState attemptState =
             new ApplicationAttemptState(attemptId,
-              attemptStateData.getMasterContainer(), credentials,
-              attemptStateData.getStartTime(),
-              attemptStateData.getState(),
-              attemptStateData.getFinalTrackingUrl(),
-              attemptStateData.getDiagnostics(),
-              attemptStateData.getFinalApplicationStatus());
-        if (!attemptId.equals(attemptState.getAttemptId())) {
-          throw new YarnRuntimeException("The child node name is different " +
-              "from the application attempt id");
-        }
-        attempts.add(attemptState);
-      } else {
-        LOG.info("Unknown child node with name: " + childNodeName);
-      }
-    }
+                attemptStateData.getMasterContainer(), credentials,
+                attemptStateData.getStartTime(),
+                attemptStateData.getState(),
+                attemptStateData.getFinalTrackingUrl(),
+                attemptStateData.getDiagnostics(),
+                attemptStateData.getFinalApplicationStatus());
 
-    // go through all attempts and add them to their apps
-    for (ApplicationAttemptState attemptState : attempts) {
-      ApplicationId appId = attemptState.getAttemptId().getApplicationId();
-      ApplicationState appState = rmState.appState.get(appId);
-      if (appState != null) {
         appState.attempts.put(attemptState.getAttemptId(), attemptState);
-      } else {
-        // the application znode may have been removed when the application
-        // completed but the RM might have stopped before it could remove the
-        // application attempt znodes
-        LOG.info("Application node not found for attempt: "
-            + attemptState.getAttemptId());
-        deleteWithRetries(
-            getNodePath(rmAppRoot, attemptState.getAttemptId().toString()), -1);
       }
     }
     LOG.info("Done Loading applications from ZK state store");
   }
 
   @Override
-  public synchronized void storeApplicationStateInternal(String appId,
+  public synchronized void storeApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateDataPB) throws Exception {
-    String nodeCreatePath = getNodePath(rmAppRoot, appId);
+    String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
@@ -481,25 +558,29 @@ public class ZKRMStateStore extends RMSt
   }
 
   @Override
-  public synchronized void updateApplicationStateInternal(String appId,
+  public synchronized void updateApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateDataPB) throws Exception {
-    String nodeCreatePath = getNodePath(rmAppRoot, appId);
+    String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Storing final state info for app: " + appId + " at: "
-          + nodeCreatePath);
+          + nodeUpdatePath);
     }
     byte[] appStateData = appStateDataPB.getProto().toByteArray();
-    setDataWithRetries(nodeCreatePath, appStateData, 0);
+    setDataWithRetries(nodeUpdatePath, appStateData, 0);
   }
 
   @Override
   public synchronized void storeApplicationAttemptStateInternal(
-      String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptId appAttemptId,
+      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
       throws Exception {
-    String nodeCreatePath = getNodePath(rmAppRoot, attemptId);
+    String appDirPath = getNodePath(rmAppRoot,
+        appAttemptId.getApplicationId().toString());
+    String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Storing info for attempt: " + attemptId + " at: "
+      LOG.debug("Storing info for attempt: " + appAttemptId + " at: "
           + nodeCreatePath);
     }
     byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
@@ -509,31 +590,36 @@ public class ZKRMStateStore extends RMSt
 
   @Override
   public synchronized void updateApplicationAttemptStateInternal(
-      String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptId appAttemptId,
+      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
       throws Exception {
-    String nodeCreatePath = getNodePath(rmAppRoot, attemptId);
+    String appIdStr = appAttemptId.getApplicationId().toString();
+    String appAttemptIdStr = appAttemptId.toString();
+    String appDirPath = getNodePath(rmAppRoot, appIdStr);
+    String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Storing final state info for attempt: " + attemptId + " at: "
-          + nodeCreatePath);
+      LOG.debug("Storing final state info for attempt: " + appAttemptIdStr
+          + " at: " + nodeUpdatePath);
     }
     byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
-    setDataWithRetries(nodeCreatePath, attemptStateData, 0);
+    setDataWithRetries(nodeUpdatePath, attemptStateData, 0);
   }
 
   @Override
   public synchronized void removeApplicationStateInternal(ApplicationState appState)
       throws Exception {
     String appId = appState.getAppId().toString();
-    String nodeRemovePath = getNodePath(rmAppRoot, appId);
+    String appIdRemovePath = getNodePath(rmAppRoot, appId);
     ArrayList<Op> opList = new ArrayList<Op>();
-    opList.add(Op.delete(nodeRemovePath, -1));
 
     for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
-      String attemptRemovePath = getNodePath(rmAppRoot, attemptId.toString());
+      String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString());
       opList.add(Op.delete(attemptRemovePath, -1));
     }
+    opList.add(Op.delete(appIdRemovePath, -1));
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Removing info for app: " + appId + " at: " + nodeRemovePath
+      LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
           + " and its attempts.");
     }
     doMultiWithRetries(opList);
@@ -546,38 +632,37 @@ public class ZKRMStateStore extends RMSt
     ArrayList<Op> opList = new ArrayList<Op>();
     // store RM delegation token
     String nodeCreatePath =
-        getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX
+        getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
             + rmDTIdentifier.getSequenceNumber());
-    ByteArrayOutputStream os = new ByteArrayOutputStream();
-    DataOutputStream fsOut = new DataOutputStream(os);
+    ByteArrayOutputStream tokenOs = new ByteArrayOutputStream();
+    DataOutputStream tokenOut = new DataOutputStream(tokenOs);
+    ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
+    DataOutputStream seqOut = new DataOutputStream(seqOs);
+
     try {
-      rmDTIdentifier.write(fsOut);
-      fsOut.writeLong(renewDate);
+      rmDTIdentifier.write(tokenOut);
+      tokenOut.writeLong(renewDate);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Storing RMDelegationToken_" +
             rmDTIdentifier.getSequenceNumber());
       }
-      opList.add(Op.create(nodeCreatePath, os.toByteArray(), zkAcl,
+
+      opList.add(Op.create(nodeCreatePath, tokenOs.toByteArray(), zkAcl,
           CreateMode.PERSISTENT));
-    } finally {
-      os.close();
-    }
 
-    // store sequence number
-    String latestSequenceNumberPath =
-        getNodePath(rmDTSecretManagerRoot,
-            DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX +
-          latestSequenceNumber);
-    }
 
-    if (dtSequenceNumberPath != null) {
-      opList.add(Op.delete(dtSequenceNumberPath, -1));
+     seqOut.writeInt(latestSequenceNumber);
+     if (LOG.isDebugEnabled()) {
+        LOG.debug("Storing " + dtSequenceNumberPath +
+            ". SequenceNumber: " + latestSequenceNumber);
+      }
+
+     opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1));
+    } finally {
+      tokenOs.close();
+      seqOs.close();
     }
-    opList.add(Op.create(latestSequenceNumberPath, null, zkAcl,
-        CreateMode.PERSISTENT));
-    dtSequenceNumberPath = latestSequenceNumberPath;
+
     doMultiWithRetries(opList);
   }
 
@@ -585,7 +670,7 @@ public class ZKRMStateStore extends RMSt
   protected synchronized void removeRMDelegationTokenState(
       RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
     String nodeRemovePath =
-        getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX
+        getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
             + rmDTIdentifier.getSequenceNumber());
     if (LOG.isDebugEnabled()) {
       LOG.debug("Removing RMDelegationToken_"
@@ -598,7 +683,7 @@ public class ZKRMStateStore extends RMSt
   protected synchronized void storeRMDTMasterKeyState(
       DelegationKey delegationKey) throws Exception {
     String nodeCreatePath =
-        getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX
+        getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
             + delegationKey.getKeyId());
     ByteArrayOutputStream os = new ByteArrayOutputStream();
     DataOutputStream fsOut = new DataOutputStream(os);
@@ -618,7 +703,7 @@ public class ZKRMStateStore extends RMSt
   protected synchronized void removeRMDTMasterKeyState(
       DelegationKey delegationKey) throws Exception {
     String nodeRemovePath =
-        getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX
+        getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
             + delegationKey.getKeyId());
     if (LOG.isDebugEnabled()) {
       LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
@@ -757,8 +842,7 @@ public class ZKRMStateStore extends RMSt
     return new ZKAction<byte[]>() {
       @Override
       public byte[] run() throws KeeperException, InterruptedException {
-        Stat stat = new Stat();
-        return zkClient.getData(path, watch, stat);
+        return zkClient.getData(path, watch, null);
       }
     }.runWithRetries();
   }
@@ -865,4 +949,5 @@ public class ZKRMStateStore extends RMSt
     zk.register(new ForwardingWatcher());
     return zk;
   }
+
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Fri Dec 20 01:01:18 2013
@@ -197,13 +197,13 @@ public interface RMApp extends EventHand
   String getApplicationType(); 
 
   /**
-   * Check whether this application is safe to unregister.
-   * An application is deemed to be safe to unregister if it is an unmanaged
-   * AM or its state has been removed from state store.
+   * Check whether this application is safe to terminate.
+   * An application is deemed to be safe to terminate if it is an unmanaged
+   * AM or its state has been saved in state store.
    * @return the flag which indicates whether this application is safe to
-   *         unregister.
+   *         terminate.
    */
-  boolean isAppSafeToUnregister();
+  boolean isAppSafeToTerminate();
 
   /**
    * Create the external user-facing state of ApplicationMaster from the

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Fri Dec 20 01:01:18 2013
@@ -37,5 +37,4 @@ public enum RMAppEventType {
   // Source: RMStateStore
   APP_NEW_SAVED,
   APP_UPDATE_SAVED,
-  APP_REMOVED
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Fri Dec 20 01:01:18 2013
@@ -110,10 +110,14 @@ public class RMAppImpl implements RMApp,
   private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
   private static final AppFinishedTransition FINISHED_TRANSITION =
       new AppFinishedTransition();
+
+  // These states stored are only valid when app is at killing or final_saving.
+  private RMAppState stateBeforeKilling;
   private RMAppState stateBeforeFinalSaving;
   private RMAppEvent eventCausingFinalSaving;
   private RMAppState targetedFinalState;
   private RMAppState recoveredFinalState;
+
   Object transitionTodo;
 
   private static final StateMachineFactory<RMAppImpl,
@@ -166,10 +170,8 @@ public class RMAppImpl implements RMApp,
           new AppRejectedTransition(), RMAppState.FAILED))
     .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
         RMAppEventType.APP_ACCEPTED)
-    .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
-        RMAppEventType.KILL,
-        new FinalSavingTransition(
-          new KillAppAndAttemptTransition(), RMAppState.KILLED))
+    .addTransition(RMAppState.SUBMITTED, RMAppState.KILLING,
+        RMAppEventType.KILL,new KillAttemptTransition())
 
      // Transitions from ACCEPTED state
     .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
@@ -180,10 +182,8 @@ public class RMAppImpl implements RMApp,
         EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
         RMAppEventType.ATTEMPT_FAILED,
         new AttemptFailedTransition(RMAppState.SUBMITTED))
-    .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
-        RMAppEventType.KILL,
-        new FinalSavingTransition(
-          new KillAppAndAttemptTransition(), RMAppState.KILLED))
+    .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
+        RMAppEventType.KILL,new KillAttemptTransition())
 
      // Transitions from RUNNING state
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
@@ -200,10 +200,8 @@ public class RMAppImpl implements RMApp,
         EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
         RMAppEventType.ATTEMPT_FAILED,
         new AttemptFailedTransition(RMAppState.SUBMITTED))
-    .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
-        RMAppEventType.KILL,
-        new FinalSavingTransition(
-          new KillAppAndAttemptTransition(), RMAppState.KILLED))
+    .addTransition(RMAppState.RUNNING, RMAppState.KILLING,
+        RMAppEventType.KILL, new KillAttemptTransition())
 
      // Transitions from FINAL_SAVING state
     .addTransition(RMAppState.FINAL_SAVING,
@@ -221,11 +219,27 @@ public class RMAppImpl implements RMApp,
      // Transitions from FINISHING state
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
         RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
-    .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
-        RMAppEventType.KILL, new KillAppAndAttemptTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
-      EnumSet.of(RMAppEventType.NODE_UPDATE))
+      EnumSet.of(RMAppEventType.NODE_UPDATE,
+        // ignore Kill as we have already saved the final Finished state in
+        // state store.
+        RMAppEventType.KILL))
+
+     // Transitions from KILLING state
+    .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
+        RMAppEventType.ATTEMPT_KILLED,
+        new FinalSavingTransition(
+          new AppKilledTransition(), RMAppState.KILLED))
+    .addTransition(RMAppState.KILLING, RMAppState.KILLING,
+        EnumSet.of(
+            RMAppEventType.NODE_UPDATE,
+            RMAppEventType.ATTEMPT_REGISTERED,
+            RMAppEventType.ATTEMPT_UNREGISTERED,
+            RMAppEventType.ATTEMPT_FINISHED,
+            RMAppEventType.ATTEMPT_FAILED,
+            RMAppEventType.APP_UPDATE_SAVED,
+            RMAppEventType.KILL))
 
      // Transitions from FINISHED state
      // ignorable transitions
@@ -249,7 +263,7 @@ public class RMAppImpl implements RMApp,
         EnumSet.of(RMAppEventType.APP_ACCEPTED,
             RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
             RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
-            RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE))
+            RMAppEventType.NODE_UPDATE))
 
      .installTopology();
 
@@ -419,6 +433,7 @@ public class RMAppImpl implements RMApp,
     case ACCEPTED:
     case RUNNING:
     case FINAL_SAVING:
+    case KILLING:
       return FinalApplicationStatus.UNDEFINED;    
     // finished without a proper final state is the same as failed  
     case FINISHING:
@@ -681,7 +696,7 @@ public class RMAppImpl implements RMApp,
       }
 
       // No existent attempts means the attempt associated with this app was not
-      // started or started but not yet saved。
+      // started or started but not yet saved.
       if (app.attempts.isEmpty()) {
         app.createNewAttempt(true);
         return RMAppState.SUBMITTED;
@@ -811,7 +826,7 @@ public class RMAppImpl implements RMApp,
       RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
       diags = getAppAttemptFailedDiagnostics(failedEvent);
       break;
-    case KILL:
+    case ATTEMPT_KILLED:
       diags = getAppKilledDiagnostics();
       break;
     default:
@@ -901,7 +916,7 @@ public class RMAppImpl implements RMApp,
   private static class AppKilledTransition extends FinalTransition {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      app.diagnostics.append("Application killed by user.");
+      app.diagnostics.append(getAppKilledDiagnostics());
       super.transition(app, event);
     };
   }
@@ -910,15 +925,16 @@ public class RMAppImpl implements RMApp,
     return "Application killed by user.";
   }
 
-  private static class KillAppAndAttemptTransition extends AppKilledTransition {
+  private static class KillAttemptTransition extends RMAppTransition {
     @SuppressWarnings("unchecked")
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      app.handler.handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(),
-          RMAppAttemptEventType.KILL));
-      super.transition(app, event);
+      app.stateBeforeKilling = app.getState();
+      app.handler.handle(new RMAppAttemptEvent(app.currentAttempt
+        .getAppAttemptId(), RMAppAttemptEventType.KILL));
     }
   }
+
   private static final class AppRejectedTransition extends
       FinalTransition{
     public void transition(RMAppImpl app, RMAppEvent event) {
@@ -986,7 +1002,7 @@ public class RMAppImpl implements RMApp,
   }
 
   @Override
-  public boolean isAppSafeToUnregister() {
+  public boolean isAppSafeToTerminate() {
     RMAppState state = getState();
     return state.equals(RMAppState.FINISHING)
         || state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
@@ -1003,6 +1019,9 @@ public class RMAppImpl implements RMApp,
     if (rmAppState.equals(RMAppState.FINAL_SAVING)) {
       rmAppState = stateBeforeFinalSaving;
     }
+    if (rmAppState.equals(RMAppState.KILLING)) {
+      rmAppState = stateBeforeKilling;
+    }
     switch (rmAppState) {
     case NEW:
       return YarnApplicationState.NEW;

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java Fri Dec 20 01:01:18 2013
@@ -28,5 +28,6 @@ public enum RMAppState {
   FINISHING,
   FINISHED,
   FAILED,
+  KILLING,
   KILLED
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Fri Dec 20 01:01:18 2013
@@ -361,6 +361,8 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.UNREGISTERED,
               RMAppAttemptEventType.STATUS_UPDATE,
               RMAppAttemptEventType.CONTAINER_ALLOCATED,
+            // ignore Kill as we have already saved the final Finished state in
+            // state store.
               RMAppAttemptEventType.KILL))
 
       // Transitions from FINISHED State

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Fri Dec 20 01:01:18 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.Applic
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -277,12 +278,10 @@ public class MockRM extends ResourceMana
         node.getState());
   }
 
-  public void killApp(ApplicationId appId) throws Exception {
+  public KillApplicationResponse killApp(ApplicationId appId) throws Exception {
     ApplicationClientProtocol client = getClientRMService();
-    KillApplicationRequest req = Records
-        .newRecord(KillApplicationRequest.class);
-    req.setApplicationId(appId);
-    client.forceKillApplication(req);
+    KillApplicationRequest req = KillApplicationRequest.newInstance(appId);
+    return client.forceKillApplication(req);
   }
 
   // from AMLauncher

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Fri Dec 20 01:01:18 2013
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -76,8 +77,9 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -414,10 +416,8 @@ public class TestRMRestart {
     MockRM rm2 = new MockRM(conf, memStore);
     rm2.start();
     // assert the previous AM state is loaded back on RM recovery.
-    RMApp recoveredApp =
-        rm2.getRMContext().getRMApps().get(app0.getApplicationId());
-    Assert.assertEquals(RMAppAttemptState.FAILED, recoveredApp
-      .getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState());
+
+    rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED);
     rm1.stop();
     rm2.stop();
   }
@@ -683,14 +683,14 @@ public class TestRMRestart {
     MemoryRMStateStore memStore = new MemoryRMStateStore() {
       @Override
       public synchronized void storeApplicationAttemptStateInternal(
-          String attemptIdStr,
+          ApplicationAttemptId attemptId,
           ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
         // ignore attempt saving request.
       }
 
       @Override
       public synchronized void updateApplicationAttemptStateInternal(
-          String attemptIdStr,
+          ApplicationAttemptId attemptId,
           ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
         // ignore attempt saving request.
       }
@@ -964,8 +964,8 @@ public class TestRMRestart {
     Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), 
                         attemptState.getMasterContainer().getId());
 
-    // Setting AMLivelinessMonitor interval to be 10 Secs. 
-    conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
+    // Setting AMLivelinessMonitor interval to be 3 Secs.
+    conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 3000);
     // start new RM   
     MockRM rm2 = new MockRM(conf, memStore);
     rm2.start();
@@ -1494,6 +1494,70 @@ public class TestRMRestart {
     Assert.assertTrue(rm1.getServiceState() == STATE.STOPPED);
   }
 
+  // This is to test Killing application should be able to wait until app
+  // reaches killed state and also check that attempt state is saved before app
+  // state is saved.
+  @Test
+  public void testClientRetryOnKillingApplication() throws Exception {
+    MemoryRMStateStore memStore = new TestMemoryRMStateStore();
+    memStore.init(conf);
+
+    // start RM
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    RMApp app1 =
+        rm1.submitApp(200, "name", "user", null, false, "default", 1, null,
+          "myType");
+    MockAM am1 = launchAM(app1, rm1, nm1);
+
+    KillApplicationResponse response;
+    int count = 0;
+    while (true) {
+      response = rm1.killApp(app1.getApplicationId());
+      if (response.getIsKillCompleted()) {
+        break;
+      }
+      Thread.sleep(100);
+      count++;
+    }
+    // we expect at least 2 calls for killApp as the first killApp always return
+    // false.
+    Assert.assertTrue(count >= 1);
+
+    rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.KILLED);
+    rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+    Assert.assertEquals(1, ((TestMemoryRMStateStore) memStore).updateAttempt);
+    Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp);
+  }
+
+  public class TestMemoryRMStateStore extends MemoryRMStateStore {
+    int count = 0;
+    public int updateApp = 0;
+    public int updateAttempt = 0;
+
+    @Override
+    public void updateApplicationStateInternal(ApplicationId appId,
+        ApplicationStateDataPBImpl appStateData) throws Exception {
+      updateApp = ++count;
+      super.updateApplicationStateInternal(appId, appStateData);
+    }
+
+    @Override
+    public synchronized void
+        updateApplicationAttemptStateInternal(
+            ApplicationAttemptId attemptId,
+            ApplicationAttemptStateDataPBImpl attemptStateData)
+            throws Exception {
+      updateAttempt = ++count;
+      super.updateApplicationAttemptStateInternal(attemptId,
+        attemptStateData);
+    }
+  }
+
   public static class TestSecurityMockRM extends MockRM {
 
     public TestSecurityMockRM(Configuration conf, RMStateStore store) {

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Fri Dec 20 01:01:18 2013
@@ -79,22 +79,23 @@ public class TestResourceManager {
 
   @Test
   public void testResourceAllocation() throws IOException,
-      YarnException {
+      YarnException, InterruptedException {
     LOG.info("--- START: testResourceAllocation ---");
         
     final int memory = 4 * 1024;
+    final int vcores = 4;
     
     // Register node1
     String host1 = "host1";
     org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = 
       registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, 
-          Resources.createResource(memory, 1));
+          Resources.createResource(memory, vcores));
     
     // Register node2
     String host2 = "host2";
     org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm2 = 
       registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK, 
-          Resources.createResource(memory/2, 1));
+          Resources.createResource(memory/2, vcores/2));
 
     // Submit an application
     Application application = new Application("user1", resourceManager);
@@ -142,8 +143,10 @@ public class TestResourceManager {
     application.schedule();
     checkResourceUsage(nm1, nm2);
     
-    // Send a heartbeat to kick the tires on the Scheduler
+    // Send heartbeats to kick the tires on the Scheduler
     nodeUpdate(nm2);
+    nodeUpdate(nm2);
+    nodeUpdate(nm1);
     nodeUpdate(nm1);
     
     // Get allocations from the scheduler

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java Fri Dec 20 01:01:18 2013
@@ -145,7 +145,7 @@ public abstract class MockAsm extends Mo
     }
 
     @Override
-    public boolean isAppSafeToUnregister() {
+    public boolean isAppSafeToTerminate() {
       throw new UnsupportedOperationException("Not supported yet.");
     }
 

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java Fri Dec 20 01:01:18 2013
@@ -234,6 +234,12 @@ public class RMStateStoreTestBase extend
     attempts.put(attemptIdRemoved, mockRemovedAttempt);
     store.removeApplication(mockRemovedApp);
 
+    // remove application directory recursively.
+    storeApp(store, appIdRemoved, submitTime, startTime);
+    storeAttempt(store, attemptIdRemoved,
+        "container_1352994193343_0002_01_000001", null, null, dispatcher);
+    store.removeApplication(mockRemovedApp);
+
     // let things settle down
     Thread.sleep(1000);
     store.close();
@@ -373,7 +379,30 @@ public class RMStateStoreTestBase extend
     Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
     Assert.assertEquals(sequenceNumber,
         secretManagerState.getDTSequenceNumber());
+
+    // check to delete delegationKey
+    store.removeRMDTMasterKey(key);
+    keySet.clear();
+    RMDTSecretManagerState noKeySecretManagerState =
+        store.loadState().getRMDTSecretManagerState();
+    Assert.assertEquals(token1, noKeySecretManagerState.getTokenState());
+    Assert.assertEquals(keySet, noKeySecretManagerState.getMasterKeyState());
+    Assert.assertEquals(sequenceNumber,
+        noKeySecretManagerState.getDTSequenceNumber());
+
+    // check to delete delegationToken
+    store.removeRMDelegationToken(dtId1, sequenceNumber);
+    RMDTSecretManagerState noKeyAndTokenSecretManagerState =
+        store.loadState().getRMDTSecretManagerState();
+    token1.clear();
+    Assert.assertEquals(token1,
+        noKeyAndTokenSecretManagerState.getTokenState());
+    Assert.assertEquals(keySet,
+        noKeyAndTokenSecretManagerState.getMasterKeyState());
+    Assert.assertEquals(sequenceNumber,
+        noKeySecretManagerState.getDTSequenceNumber());
     store.close();
+
   }
 
   private Token<AMRMTokenIdentifier> generateAMRMToken(

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java Fri Dec 20 01:01:18 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@@ -178,10 +179,11 @@ public class TestFSRMStateStore extends 
         @Override
         public void run() {
           try {
-            store.storeApplicationStateInternal("application1",
-              (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
-                .newApplicationStateData(111, 111, "user", null,
-                  RMAppState.ACCEPTED, "diagnostics", 333));
+            store.storeApplicationStateInternal(
+                ApplicationId.newInstance(100L, 1),
+                (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
+                    .newApplicationStateData(111, 111, "user", null,
+                        RMAppState.ACCEPTED, "diagnostics", 333));
           } catch (Exception e) {
             // TODO 0 datanode exception will not be retried by dfs client, fix
             // that separately.

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java Fri Dec 20 01:01:18 2013
@@ -218,7 +218,7 @@ public class MockRMApp implements RMApp 
   }
 
   @Override
-  public boolean isAppSafeToUnregister() {
+  public boolean isAppSafeToTerminate() {
     return true;
   }
 

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Fri Dec 20 01:01:18 2013
@@ -301,12 +301,9 @@ public class TestRMAppTransitions {
 
   private void assertAppAndAttemptKilled(RMApp application)
       throws InterruptedException {
+    sendAttemptUpdateSavedEvent(application);
     sendAppUpdateSavedEvent(application);
     assertKilled(application);
-    // send attempt final state saved event.
-    application.getCurrentAppAttempt().handle(
-      new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt()
-        .getAppAttemptId(), null));
     Assert.assertEquals(RMAppAttemptState.KILLED, application
       .getCurrentAppAttempt().getAppAttemptState());
     assertAppFinalStateSaved(application);
@@ -329,6 +326,12 @@ public class TestRMAppTransitions {
     rmDispatcher.await();
   }
 
+  private void sendAttemptUpdateSavedEvent(RMApp application) {
+    application.getCurrentAppAttempt().handle(
+      new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt()
+        .getAppAttemptId(), null));
+  }
+
   protected RMApp testCreateAppNewSaving(
       ApplicationSubmissionContext submissionContext) throws IOException {
   RMApp application = createNewTestApp(submissionContext);
@@ -624,11 +627,12 @@ public class TestRMAppTransitions {
     rmDispatcher.await();
 
     // Ignore Attempt_Finished if we were supposed to go to Finished.
-    assertAppState(RMAppState.FINAL_SAVING, application);
+    assertAppState(RMAppState.KILLING, application);
     RMAppEvent finishEvent =
         new RMAppFinishedAttemptEvent(application.getApplicationId(), null);
     application.handle(finishEvent);
-    assertAppState(RMAppState.FINAL_SAVING, application);
+    assertAppState(RMAppState.KILLING, application);
+    sendAttemptUpdateSavedEvent(application);
     sendAppUpdateSavedEvent(application);
     assertKilled(application);
   }
@@ -686,8 +690,8 @@ public class TestRMAppTransitions {
   }
 
   @Test
-  public void testAppFinishingKill() throws IOException {
-    LOG.info("--- START: testAppFinishedFinished ---");
+  public void testAppAtFinishingIgnoreKill() throws IOException {
+    LOG.info("--- START: testAppAtFinishingIgnoreKill ---");
 
     RMApp application = testCreateAppFinishing(null);
     // FINISHING => FINISHED event RMAppEventType.KILL
@@ -695,7 +699,7 @@ public class TestRMAppTransitions {
         new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
     application.handle(event);
     rmDispatcher.await();
-    assertAppState(RMAppState.FINISHED, application);
+    assertAppState(RMAppState.FINISHING, application);
   }
 
   // While App is at FINAL_SAVING, Attempt_Finished event may come before
@@ -780,6 +784,7 @@ public class TestRMAppTransitions {
         new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
     application.handle(event);
     rmDispatcher.await();
+    sendAttemptUpdateSavedEvent(application);
     sendAppUpdateSavedEvent(application);
     assertTimesAtFinish(application);
     assertAppState(RMAppState.KILLED, application);
@@ -801,14 +806,6 @@ public class TestRMAppTransitions {
     assertTimesAtFinish(application);
     assertAppState(RMAppState.KILLED, application);
 
-    // KILLED => KILLED event RMAppEventType.ATTEMPT_KILLED
-    event = 
-        new RMAppEvent(application.getApplicationId(), 
-            RMAppEventType.ATTEMPT_KILLED);
-    application.handle(event);
-    rmDispatcher.await();
-    assertTimesAtFinish(application);
-    assertAppState(RMAppState.KILLED, application);
 
     // KILLED => KILLED event RMAppEventType.KILL
     event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java Fri Dec 20 01:01:18 2013
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -38,6 +39,7 @@ import org.apache.hadoop.service.Abstrac
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -65,6 +67,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
+import static org.junit.Assert.fail;
+
 /**
  * Embedded Yarn minicluster for testcases that need to interact with a cluster.
  * <p/>
@@ -91,9 +95,11 @@ public class MiniYARNCluster extends Com
 
   private NodeManager[] nodeManagers;
   private ResourceManager[] resourceManagers;
+  private String[] rmIds;
+
+  private boolean useFixedPorts;
+  private boolean useRpc = false;
 
-  private ResourceManagerWrapper resourceManagerWrapper;
-  
   private ConcurrentMap<ApplicationAttemptId, Long> appMasters =
       new ConcurrentHashMap<ApplicationAttemptId, Long>(16, 0.75f, 2);
   
@@ -163,15 +169,7 @@ public class MiniYARNCluster extends Com
     }
 
     resourceManagers = new ResourceManager[numResourceManagers];
-    for (int i = 0; i < numResourceManagers; i++) {
-      resourceManagers[i] = new ResourceManager();
-      addService(new ResourceManagerWrapper(i));
-    }
-    nodeManagers = new CustomNodeManager[numNodeManagers];
-    for(int index = 0; index < numNodeManagers; index++) {
-      addService(new NodeManagerWrapper(index));
-      nodeManagers[index] = new CustomNodeManager();
-    }
+    nodeManagers = new NodeManager[numNodeManagers];
   }
 
   /**
@@ -185,20 +183,45 @@ public class MiniYARNCluster extends Com
     this(testName, 1, numNodeManagers, numLocalDirs, numLogDirs);
   }
 
-    @Override
+  @Override
   public void serviceInit(Configuration conf) throws Exception {
+    useFixedPorts = conf.getBoolean(
+        YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
+        YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS);
+    useRpc = conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC,
+        YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC);
+
+    if (useRpc && !useFixedPorts) {
+      throw new YarnRuntimeException("Invalid configuration!" +
+          " Minicluster can use rpc only when configured to use fixed ports");
+    }
+
     if (resourceManagers.length > 1) {
       conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
-
-      StringBuilder rmIds = new StringBuilder();
-      for (int i = 0; i < resourceManagers.length; i++) {
-        if (i != 0) {
-          rmIds.append(",");
+      if (conf.get(YarnConfiguration.RM_HA_IDS) == null) {
+        StringBuilder rmIds = new StringBuilder();
+        for (int i = 0; i < resourceManagers.length; i++) {
+          if (i != 0) {
+            rmIds.append(",");
+          }
+          rmIds.append("rm" + i);
         }
-        rmIds.append("rm" + i);
+        conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString());
       }
-      conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString());
+      Collection<String> rmIdsCollection = HAUtil.getRMHAIds(conf);
+      rmIds = rmIdsCollection.toArray(new String[rmIdsCollection.size()]);
     }
+
+    for (int i = 0; i < resourceManagers.length; i++) {
+      resourceManagers[i] = new ResourceManager();
+      addService(new ResourceManagerWrapper(i));
+    }
+    for(int index = 0; index < nodeManagers.length; index++) {
+      nodeManagers[index] =
+          useRpc ? new CustomNodeManager() : new ShortCircuitedNodeManager();
+      addService(new NodeManagerWrapper(index));
+    }
+
     super.serviceInit(
         conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
   }
@@ -213,11 +236,12 @@ public class MiniYARNCluster extends Com
    *
    * In an non-HA cluster, return the index of the only RM.
    *
-   * @return index of the active RM
+   * @return index of the active RM or -1 if none of them transition to
+   * active even after 5 seconds of waiting
    */
   @InterfaceAudience.Private
   @VisibleForTesting
-  int getActiveRMIndex() {
+  public int getActiveRMIndex() {
     if (resourceManagers.length == 1) {
       return 0;
     }
@@ -292,9 +316,7 @@ public class MiniYARNCluster extends Com
     }
 
     private void setHARMConfiguration(Configuration conf) {
-      String rmId = "rm" + index;
       String hostname = MiniYARNCluster.getHostname();
-      conf.set(YarnConfiguration.RM_HA_ID, rmId);
       for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
         for (String id : HAUtil.getRMHAIds(conf)) {
           conf.set(HAUtil.addSuffix(confKey, id), hostname + ":0");
@@ -306,15 +328,17 @@ public class MiniYARNCluster extends Com
     protected synchronized void serviceInit(Configuration conf)
         throws Exception {
       conf.setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true);
-      if (!conf.getBoolean(
-          YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
-          YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) {
+
+      if (!useFixedPorts) {
         if (HAUtil.isHAEnabled(conf)) {
           setHARMConfiguration(conf);
         } else {
           setNonHARMConfiguration(conf);
         }
       }
+      if (HAUtil.isHAEnabled(conf)) {
+        conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]);
+      }
       resourceManagers[index].init(conf);
       resourceManagers[index].getRMContext().getDispatcher().register
           (RMAppAttemptEventType.class,
@@ -500,7 +524,9 @@ public class MiniYARNCluster extends Com
     protected void doSecureLogin() throws IOException {
       // Don't try to login using keytab in the testcase.
     }
+  }
 
+  private class ShortCircuitedNodeManager extends CustomNodeManager {
     @Override
     protected NodeStatusUpdater createNodeStatusUpdater(Context context,
         Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
@@ -553,4 +579,28 @@ public class MiniYARNCluster extends Com
       };
     }
   }
+
+  /**
+   * Wait for all the NodeManagers to connect to the ResourceManager.
+   *
+   * @param timeout Time to wait (sleeps in 100 ms intervals) in milliseconds.
+   * @return true if all NodeManagers connect to the (Active)
+   * ResourceManager, false otherwise.
+   * @throws YarnException
+   * @throws InterruptedException
+   */
+  public boolean waitForNodeManagersToConnect(long timeout)
+      throws YarnException, InterruptedException {
+    ResourceManager rm = getResourceManager();
+    GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance();
+
+    for (int i = 0; i < timeout / 100; i++) {
+      if (nodeManagers.length == rm.getClientRMService().getClusterMetrics(req)
+          .getClusterMetrics().getNumNodeManagers()) {
+        return true;
+      }
+      Thread.sleep(100);
+    }
+    return false;
+  }
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java Fri Dec 20 01:01:18 2013
@@ -33,6 +33,7 @@ import java.io.IOException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class TestMiniYARNClusterForHA {
@@ -56,16 +57,7 @@ public class TestMiniYARNClusterForHA {
 
   @Test
   public void testClusterWorks() throws YarnException, InterruptedException {
-    ResourceManager rm = cluster.getResourceManager(0);
-    GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance();
-
-    for (int i = 0; i < 600; i++) {
-      if (1 == rm.getClientRMService().getClusterMetrics(req)
-          .getClusterMetrics().getNumNodeManagers()) {
-        return;
-      }
-      Thread.sleep(100);
-    }
-    fail("NodeManager never registered with the RM");
+    assertTrue("NMs fail to connect to the RM",
+        cluster.waitForNodeManagersToConnect(5000));
   }
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/pom.xml?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/pom.xml (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/pom.xml Fri Dec 20 01:01:18 2013
@@ -112,6 +112,11 @@
       <artifactId>guice</artifactId>
     </dependency>
     <dependency>
+        <groupId>cglib</groupId>
+        <artifactId>cglib</artifactId>
+        <scope>provided</scope>
+    </dependency>
+    <dependency>
       <groupId>com.sun.jersey.jersey-test-framework</groupId>
       <artifactId>jersey-test-framework-core</artifactId>
       <scope>test</scope>

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/pom.xml?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/pom.xml (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/pom.xml Fri Dec 20 01:01:18 2013
@@ -133,6 +133,10 @@
       <artifactId>guice</artifactId>
     </dependency>
     <dependency>
+        <groupId>cglib</groupId>
+        <artifactId>cglib</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.sun.jersey</groupId>
       <artifactId>jersey-server</artifactId>
     </dependency>



Mime
View raw message