hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1552210 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ hadoop-yarn/hadoop-yarn-server/ha...
Date Thu, 19 Dec 2013 02:33:33 GMT
Author: vinodkv
Date: Thu Dec 19 02:33:33 2013
New Revision: 1552210

URL: http://svn.apache.org/r1552210
Log:
YARN-1307. Redesign znode structure for Zookeeper based RM state-store for better organization
and scalability. Contributed by Tsuyoshi OZAWA.
svn merge --ignore-ancestry -c 1552209 ../../trunk/

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1552210&r1=1552209&r2=1552210&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Thu Dec 19 02:33:33 2013
@@ -166,6 +166,9 @@ Release 2.4.0 - UNRELEASED
     YARN-1446. Changed client API to retry killing application till RM
     acknowledges so as to account for RM crashes/failover. (Jian He via vinodkv)
 
+    YARN-1307. Redesign znode structure for Zookeeper based RM state-store for
+    better organization and scalability. (Tsuyoshi OZAWA via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1552210&r1=1552209&r2=1552210&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
Thu Dec 19 02:33:33 2013
@@ -287,11 +287,12 @@ public class FileSystemRMStateStore exte
   }
 
   @Override
-  public synchronized void storeApplicationStateInternal(String appId,
+  public synchronized void storeApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateDataPB) throws Exception {
-    Path appDirPath = getAppDir(rmAppRoot, appId);
+    String appIdStr = appId.toString();
+    Path appDirPath = getAppDir(rmAppRoot, appIdStr);
     fs.mkdirs(appDirPath);
-    Path nodeCreatePath = getNodePath(appDirPath, appId);
+    Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
 
     LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
     byte[] appStateData = appStateDataPB.getProto().toByteArray();
@@ -306,10 +307,11 @@ public class FileSystemRMStateStore exte
   }
 
   @Override
-  public synchronized void updateApplicationStateInternal(String appId,
+  public synchronized void updateApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateDataPB) throws Exception {
-    Path appDirPath = getAppDir(rmAppRoot, appId);
-    Path nodeCreatePath = getNodePath(appDirPath, appId);
+    String appIdStr = appId.toString();
+    Path appDirPath = getAppDir(rmAppRoot, appIdStr);
+    Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
 
     LOG.info("Updating info for app: " + appId + " at: " + nodeCreatePath);
     byte[] appStateData = appStateDataPB.getProto().toByteArray();
@@ -325,14 +327,13 @@ public class FileSystemRMStateStore exte
 
   @Override
   public synchronized void storeApplicationAttemptStateInternal(
-      String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptId appAttemptId,
+      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
       throws Exception {
-    ApplicationAttemptId appAttemptId =
-        ConverterUtils.toApplicationAttemptId(attemptId);
     Path appDirPath =
         getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
-    Path nodeCreatePath = getNodePath(appDirPath, attemptId);
-    LOG.info("Storing info for attempt: " + attemptId + " at: "
+    Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
+    LOG.info("Storing info for attempt: " + appAttemptId + " at: "
         + nodeCreatePath);
     byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
     try {
@@ -340,21 +341,20 @@ public class FileSystemRMStateStore exte
       // based on whether we have lost the right to write to FS
       writeFile(nodeCreatePath, attemptStateData);
     } catch (Exception e) {
-      LOG.info("Error storing info for attempt: " + attemptId, e);
+      LOG.info("Error storing info for attempt: " + appAttemptId, e);
       throw e;
     }
   }
 
   @Override
   public synchronized void updateApplicationAttemptStateInternal(
-      String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptId appAttemptId,
+      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
       throws Exception {
-    ApplicationAttemptId appAttemptId =
-        ConverterUtils.toApplicationAttemptId(attemptId);
     Path appDirPath =
         getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
-    Path nodeCreatePath = getNodePath(appDirPath, attemptId);
-    LOG.info("Updating info for attempt: " + attemptId + " at: "
+    Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
+    LOG.info("Updating info for attempt: " + appAttemptId + " at: "
         + nodeCreatePath);
     byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
     try {
@@ -362,7 +362,7 @@ public class FileSystemRMStateStore exte
       // based on whether we have lost the right to write to FS
       updateFile(nodeCreatePath, attemptStateData);
     } catch (Exception e) {
-      LOG.info("Error updating info for attempt: " + attemptId, e);
+      LOG.info("Error updating info for attempt: " + appAttemptId, e);
       throw e;
     }
   }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1552210&r1=1552209&r2=1552210&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
Thu Dec 19 02:33:33 2013
@@ -80,7 +80,7 @@ public class MemoryRMStateStore extends 
   }
 
   @Override
-  public void storeApplicationStateInternal(String appId, 
+  public void storeApplicationStateInternal(ApplicationId appId,
                                      ApplicationStateDataPBImpl appStateData)
       throws Exception {
     ApplicationState appState =
@@ -88,11 +88,11 @@ public class MemoryRMStateStore extends 
           appStateData.getStartTime(),
           appStateData.getApplicationSubmissionContext(),
           appStateData.getUser());
-    state.appState.put(appState.getAppId(), appState);
+    state.appState.put(appId, appState);
   }
 
   @Override
-  public void updateApplicationStateInternal(String appId,
+  public void updateApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateData) throws Exception {
     ApplicationState updatedAppState =
         new ApplicationState(appStateData.getSubmitTime(),
@@ -102,21 +102,19 @@ public class MemoryRMStateStore extends 
           appStateData.getDiagnostics(), appStateData.getFinishTime());
     LOG.info("Updating final state " + appStateData.getState() + " for app: "
         + appId);
-    ApplicationId applicationId = updatedAppState.getAppId();
-    if (state.appState.get(applicationId) != null) {
+    if (state.appState.get(appId) != null) {
       // add the earlier attempts back
       updatedAppState.attempts
-        .putAll(state.appState.get(applicationId).attempts);
+        .putAll(state.appState.get(appId).attempts);
     }
-    state.appState.put(applicationId, updatedAppState);
+    state.appState.put(appId, updatedAppState);
   }
 
   @Override
-  public synchronized void storeApplicationAttemptStateInternal(String attemptIdStr, 
-                            ApplicationAttemptStateDataPBImpl attemptStateData)
-                            throws Exception {
-    ApplicationAttemptId attemptId = ConverterUtils
-                                        .toApplicationAttemptId(attemptIdStr);
+  public synchronized void storeApplicationAttemptStateInternal(
+      ApplicationAttemptId appAttemptId,
+      ApplicationAttemptStateDataPBImpl attemptStateData)
+      throws Exception {
     Credentials credentials = null;
     if(attemptStateData.getAppAttemptTokens() != null){
       DataInputByteBuffer dibb = new DataInputByteBuffer();
@@ -125,7 +123,7 @@ public class MemoryRMStateStore extends 
       credentials.readTokenStorageStream(dibb);
     }
     ApplicationAttemptState attemptState =
-        new ApplicationAttemptState(attemptId,
+        new ApplicationAttemptState(appAttemptId,
           attemptStateData.getMasterContainer(), credentials,
           attemptStateData.getStartTime());
 
@@ -139,10 +137,9 @@ public class MemoryRMStateStore extends 
 
   @Override
   public synchronized void updateApplicationAttemptStateInternal(
-      String attemptIdStr, ApplicationAttemptStateDataPBImpl attemptStateData)
+      ApplicationAttemptId appAttemptId,
+      ApplicationAttemptStateDataPBImpl attemptStateData)
       throws Exception {
-    ApplicationAttemptId attemptId =
-        ConverterUtils.toApplicationAttemptId(attemptIdStr);
     Credentials credentials = null;
     if (attemptStateData.getAppAttemptTokens() != null) {
       DataInputByteBuffer dibb = new DataInputByteBuffer();
@@ -151,7 +148,7 @@ public class MemoryRMStateStore extends 
       credentials.readTokenStorageStream(dibb);
     }
     ApplicationAttemptState updatedAttemptState =
-        new ApplicationAttemptState(attemptId,
+        new ApplicationAttemptState(appAttemptId,
           attemptStateData.getMasterContainer(), credentials,
           attemptStateData.getStartTime(), attemptStateData.getState(),
           attemptStateData.getFinalTrackingUrl(),

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1552210&r1=1552209&r2=1552210&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
Thu Dec 19 02:33:33 2013
@@ -22,6 +22,8 @@ package org.apache.hadoop.yarn.server.re
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
@@ -51,13 +53,13 @@ public class NullRMStateStore extends RM
   }
 
   @Override
-  protected void storeApplicationStateInternal(String appId,
+  protected void storeApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateData) throws Exception {
     // Do nothing
   }
 
   @Override
-  protected void storeApplicationAttemptStateInternal(String attemptId,
+  protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
       ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
     // Do nothing
   }
@@ -92,13 +94,13 @@ public class NullRMStateStore extends RM
   }
 
   @Override
-  protected void updateApplicationStateInternal(String appId,
+  protected void updateApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateData) throws Exception {
     // Do nothing 
   }
 
   @Override
-  protected void updateApplicationAttemptStateInternal(String attemptId,
+  protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
       ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1552210&r1=1552209&r2=1552210&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
Thu Dec 19 02:33:33 2013
@@ -387,10 +387,10 @@ public abstract class RMStateStore exten
    * Derived classes must implement this method to store the state of an 
    * application.
    */
-  protected abstract void storeApplicationStateInternal(String appId,
+  protected abstract void storeApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateData) throws Exception;
 
-  protected abstract void updateApplicationStateInternal(String appId,
+  protected abstract void updateApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateData) throws Exception;
   
   @SuppressWarnings("unchecked")
@@ -424,10 +424,12 @@ public abstract class RMStateStore exten
    * Derived classes must implement this method to store the state of an 
    * application attempt
    */
-  protected abstract void storeApplicationAttemptStateInternal(String attemptId,
+  protected abstract void storeApplicationAttemptStateInternal(
+      ApplicationAttemptId attemptId,
       ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
 
-  protected abstract void updateApplicationAttemptStateInternal(String attemptId,
+  protected abstract void updateApplicationAttemptStateInternal(
+      ApplicationAttemptId attemptId,
       ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
 
   /**
@@ -592,11 +594,11 @@ public abstract class RMStateStore exten
       LOG.info("Storing info for app: " + appId);
       try {
         if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
-          storeApplicationStateInternal(appId.toString(), appStateData);
+          storeApplicationStateInternal(appId, appStateData);
           notifyDoneStoringApplication(appId, storedException);
         } else {
           assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
-          updateApplicationStateInternal(appId.toString(), appStateData);
+          updateApplicationStateInternal(appId, appStateData);
           notifyDoneUpdatingApplication(appId, storedException);
         }
       } catch (Exception e) {
@@ -637,15 +639,15 @@ public abstract class RMStateStore exten
           LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
         }
         if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
-          storeApplicationAttemptStateInternal(attemptState.getAttemptId()
-            .toString(), attemptStateData);
+          storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
+              attemptStateData);
           notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
               storedException);
         } else {
           assert event.getType().equals(
             RMStateStoreEventType.UPDATE_APP_ATTEMPT);
-          updateApplicationAttemptStateInternal(attemptState.getAttemptId()
-            .toString(), attemptStateData);
+          updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
+              attemptStateData);
           notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
               storedException);
         }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1552210&r1=1552209&r2=1552210&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
Thu Dec 19 02:33:33 2013
@@ -78,16 +78,51 @@ public class ZKRMStateStore extends RMSt
   protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
   protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
       .newInstance(1, 0);
+  private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
+      "RMDelegationTokensRoot";
+  private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
+      "RMDTSequentialNumber";
+  private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
+      "RMDTMasterKeysRoot";
   private int numRetries;
 
   private String zkHostPort = null;
   private int zkSessionTimeout;
   private long zkRetryInterval;
   private List<ACL> zkAcl;
+
+  /**
+   *
+   * ROOT_DIR_PATH
+   * |--- VERSION_INFO
+   * |--- RM_ZK_FENCING_LOCK
+   * |--- RM_APP_ROOT
+   * |     |----- (#ApplicationId1)
+   * |     |        |----- (#ApplicationAttemptIds)
+   * |     |
+   * |     |----- (#ApplicationId2)
+   * |     |       |----- (#ApplicationAttemptIds)
+   * |     ....
+   * |
+   * |--- RM_DT_SECRET_MANAGER_ROOT
+   *        |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
+   *        |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
+   *        |       |----- Token_1
+   *        |       |----- Token_2
+   *        |       ....
+   *        |
+   *        |----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME
+   *        |      |----- Key_1
+   *        |      |----- Key_2
+   *                ....
+   *
+   */
   private String zkRootNodePath;
-  private String rmDTSecretManagerRoot;
   private String rmAppRoot;
-  private String dtSequenceNumberPath = null;
+  private String rmDTSecretManagerRoot;
+  private String dtMasterKeysRootPath;
+  private String delegationTokensRootPath;
+  private String dtSequenceNumberPath;
 
   @VisibleForTesting
   protected String znodeWorkingPath;
@@ -178,12 +213,11 @@ public class ZKRMStateStore extends RMSt
       throw bafe;
     }
 
-    zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME;
-    rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT;
-    rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT;
+    zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
+    rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
 
     /* Initialize fencing related paths, acls, and ops */
-    fencingNodePath = zkRootNodePath + "/" + FENCING_LOCK;
+    fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
     createFencingNodePathOp = Op.create(fencingNodePath, new byte[0], zkAcl,
         CreateMode.PERSISTENT);
     deleteFencingNodePathOp = Op.delete(fencingNodePath, -1);
@@ -204,6 +238,15 @@ public class ZKRMStateStore extends RMSt
         zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl);
       }
     }
+
+    rmDTSecretManagerRoot =
+        getNodePath(zkRootNodePath, RM_DT_SECRET_MANAGER_ROOT);
+    dtMasterKeysRootPath = getNodePath(rmDTSecretManagerRoot,
+        RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME);
+    delegationTokensRootPath = getNodePath(rmDTSecretManagerRoot,
+        RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
+    dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
+        RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
   }
 
   @Override
@@ -217,8 +260,11 @@ public class ZKRMStateStore extends RMSt
     if (HAUtil.isHAEnabled(getConfig())){
       fence();
     }
-    createRootDir(rmDTSecretManagerRoot);
     createRootDir(rmAppRoot);
+    createRootDir(rmDTSecretManagerRoot);
+    createRootDir(dtMasterKeysRootPath);
+    createRootDir(delegationTokensRootPath);
+    createRootDir(dtSequenceNumberPath);
   }
 
   private void createRootDir(final String rootPath) throws Exception {
@@ -350,26 +396,69 @@ public class ZKRMStateStore extends RMSt
 
   private synchronized void loadRMDTSecretManagerState(RMState rmState)
       throws Exception {
-    List<String> childNodes =
-        getChildrenWithRetries(rmDTSecretManagerRoot, true);
+    loadRMDelegationKeyState(rmState);
+    loadRMSequentialNumberState(rmState);
+    loadRMDelegationTokenState(rmState);
+  }
 
+  private void loadRMDelegationKeyState(RMState rmState) throws Exception {
+    List<String> childNodes =
+        getChildrenWithRetries(dtMasterKeysRootPath, true);
     for (String childNodeName : childNodes) {
-      if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
-        rmState.rmSecretManagerState.dtSequenceNumber =
-            Integer.parseInt(childNodeName.split("_")[1]);
+      String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName);
+      byte[] childData = getDataWithRetries(childNodePath, true);
+
+      if (childData == null) {
+        LOG.warn("Content of " + childNodePath + " is broken.");
         continue;
       }
-      String childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
-      byte[] childData = getDataWithRetries(childNodePath, true);
 
       ByteArrayInputStream is = new ByteArrayInputStream(childData);
       DataInputStream fsIn = new DataInputStream(is);
+
       try {
         if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) {
           DelegationKey key = new DelegationKey();
           key.readFields(fsIn);
           rmState.rmSecretManagerState.masterKeyState.add(key);
-        } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
+        }
+      } finally {
+        is.close();
+      }
+    }
+  }
+
+  private void loadRMSequentialNumberState(RMState rmState) throws Exception {
+    byte[] seqData = getDataWithRetries(dtSequenceNumberPath, false);
+    if (seqData != null) {
+      ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData);
+      DataInputStream seqIn = new DataInputStream(seqIs);
+
+      try {
+        rmState.rmSecretManagerState.dtSequenceNumber = seqIn.readInt();
+      } finally {
+        seqIn.close();
+      }
+    }
+  }
+
+  private void loadRMDelegationTokenState(RMState rmState) throws Exception {
+    List<String> childNodes = zkClient.getChildren(delegationTokensRootPath, true);
+    for (String childNodeName : childNodes) {
+      String childNodePath =
+          getNodePath(delegationTokensRootPath, childNodeName);
+      byte[] childData = getDataWithRetries(childNodePath, true);
+
+      if (childData == null) {
+        LOG.warn("Content of " + childNodePath + " is broken.");
+        continue;
+      }
+
+      ByteArrayInputStream is = new ByteArrayInputStream(childData);
+      DataInputStream fsIn = new DataInputStream(is);
+
+      try {
+        if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
           RMDelegationTokenIdentifier identifier =
               new RMDelegationTokenIdentifier();
           identifier.readFields(fsIn);
@@ -385,8 +474,6 @@ public class ZKRMStateStore extends RMSt
 
   private synchronized void loadRMAppState(RMState rmState) throws Exception {
     List<String> childNodes = getChildrenWithRetries(rmAppRoot, true);
-    List<ApplicationAttemptState> attempts =
-        new ArrayList<ApplicationAttemptState>();
     for (String childNodeName : childNodes) {
       String childNodePath = getNodePath(rmAppRoot, childNodeName);
       byte[] childData = getDataWithRetries(childNodePath, true);
@@ -411,17 +498,28 @@ public class ZKRMStateStore extends RMSt
               "from the application id");
         }
         rmState.appState.put(appId, appState);
-      } else if (childNodeName
-          .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
-        // attempt
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Loading application attempt from znode: " + childNodeName);
-        }
+        loadApplicationAttemptState(appState, appId);
+      } else {
+        LOG.info("Unknown child node with name: " + childNodeName);
+      }
+    }
+  }
+
+  private void loadApplicationAttemptState(ApplicationState appState,
+      ApplicationId appId)
+      throws Exception {
+    String appPath = getNodePath(rmAppRoot, appId.toString());
+    List<String> attempts = getChildrenWithRetries(appPath, false);
+    for (String attemptIDStr : attempts) {
+      if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
+        String attemptPath = getNodePath(appPath, attemptIDStr);
+        byte[] attemptData = getDataWithRetries(attemptPath, true);
+
         ApplicationAttemptId attemptId =
-            ConverterUtils.toApplicationAttemptId(childNodeName);
+            ConverterUtils.toApplicationAttemptId(attemptIDStr);
         ApplicationAttemptStateDataPBImpl attemptStateData =
             new ApplicationAttemptStateDataPBImpl(
-                ApplicationAttemptStateDataProto.parseFrom(childData));
+                ApplicationAttemptStateDataProto.parseFrom(attemptData));
         Credentials credentials = null;
         if (attemptStateData.getAppAttemptTokens() != null) {
           credentials = new Credentials();
@@ -429,47 +527,26 @@ public class ZKRMStateStore extends RMSt
           dibb.reset(attemptStateData.getAppAttemptTokens());
           credentials.readTokenStorageStream(dibb);
         }
+
         ApplicationAttemptState attemptState =
             new ApplicationAttemptState(attemptId,
-              attemptStateData.getMasterContainer(), credentials,
-              attemptStateData.getStartTime(),
-              attemptStateData.getState(),
-              attemptStateData.getFinalTrackingUrl(),
-              attemptStateData.getDiagnostics(),
-              attemptStateData.getFinalApplicationStatus());
-        if (!attemptId.equals(attemptState.getAttemptId())) {
-          throw new YarnRuntimeException("The child node name is different " +
-              "from the application attempt id");
-        }
-        attempts.add(attemptState);
-      } else {
-        LOG.info("Unknown child node with name: " + childNodeName);
-      }
-    }
+                attemptStateData.getMasterContainer(), credentials,
+                attemptStateData.getStartTime(),
+                attemptStateData.getState(),
+                attemptStateData.getFinalTrackingUrl(),
+                attemptStateData.getDiagnostics(),
+                attemptStateData.getFinalApplicationStatus());
 
-    // go through all attempts and add them to their apps
-    for (ApplicationAttemptState attemptState : attempts) {
-      ApplicationId appId = attemptState.getAttemptId().getApplicationId();
-      ApplicationState appState = rmState.appState.get(appId);
-      if (appState != null) {
         appState.attempts.put(attemptState.getAttemptId(), attemptState);
-      } else {
-        // the application znode may have been removed when the application
-        // completed but the RM might have stopped before it could remove the
-        // application attempt znodes
-        LOG.info("Application node not found for attempt: "
-            + attemptState.getAttemptId());
-        deleteWithRetries(
-            getNodePath(rmAppRoot, attemptState.getAttemptId().toString()), -1);
       }
     }
     LOG.info("Done Loading applications from ZK state store");
   }
 
   @Override
-  public synchronized void storeApplicationStateInternal(String appId,
+  public synchronized void storeApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateDataPB) throws Exception {
-    String nodeCreatePath = getNodePath(rmAppRoot, appId);
+    String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
@@ -481,25 +558,29 @@ public class ZKRMStateStore extends RMSt
   }
 
   @Override
-  public synchronized void updateApplicationStateInternal(String appId,
+  public synchronized void updateApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateDataPB) throws Exception {
-    String nodeCreatePath = getNodePath(rmAppRoot, appId);
+    String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Storing final state info for app: " + appId + " at: "
-          + nodeCreatePath);
+          + nodeUpdatePath);
     }
     byte[] appStateData = appStateDataPB.getProto().toByteArray();
-    setDataWithRetries(nodeCreatePath, appStateData, 0);
+    setDataWithRetries(nodeUpdatePath, appStateData, 0);
   }
 
   @Override
   public synchronized void storeApplicationAttemptStateInternal(
-      String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptId appAttemptId,
+      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
       throws Exception {
-    String nodeCreatePath = getNodePath(rmAppRoot, attemptId);
+    String appDirPath = getNodePath(rmAppRoot,
+        appAttemptId.getApplicationId().toString());
+    String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Storing info for attempt: " + attemptId + " at: "
+      LOG.debug("Storing info for attempt: " + appAttemptId + " at: "
           + nodeCreatePath);
     }
     byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
@@ -509,31 +590,36 @@ public class ZKRMStateStore extends RMSt
 
   @Override
   public synchronized void updateApplicationAttemptStateInternal(
-      String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptId appAttemptId,
+      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
       throws Exception {
-    String nodeCreatePath = getNodePath(rmAppRoot, attemptId);
+    String appIdStr = appAttemptId.getApplicationId().toString();
+    String appAttemptIdStr = appAttemptId.toString();
+    String appDirPath = getNodePath(rmAppRoot, appIdStr);
+    String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Storing final state info for attempt: " + attemptId + " at: "
-          + nodeCreatePath);
+      LOG.debug("Storing final state info for attempt: " + appAttemptIdStr
+          + " at: " + nodeUpdatePath);
     }
     byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
-    setDataWithRetries(nodeCreatePath, attemptStateData, 0);
+    setDataWithRetries(nodeUpdatePath, attemptStateData, 0);
   }
 
   @Override
   public synchronized void removeApplicationStateInternal(ApplicationState appState)
       throws Exception {
     String appId = appState.getAppId().toString();
-    String nodeRemovePath = getNodePath(rmAppRoot, appId);
+    String appIdRemovePath = getNodePath(rmAppRoot, appId);
     ArrayList<Op> opList = new ArrayList<Op>();
-    opList.add(Op.delete(nodeRemovePath, -1));
 
     for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
-      String attemptRemovePath = getNodePath(rmAppRoot, attemptId.toString());
+      String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString());
       opList.add(Op.delete(attemptRemovePath, -1));
     }
+    opList.add(Op.delete(appIdRemovePath, -1));
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Removing info for app: " + appId + " at: " + nodeRemovePath
+      LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
           + " and its attempts.");
     }
     doMultiWithRetries(opList);
@@ -546,38 +632,37 @@ public class ZKRMStateStore extends RMSt
     ArrayList<Op> opList = new ArrayList<Op>();
     // store RM delegation token
     String nodeCreatePath =
-        getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX
+        getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
             + rmDTIdentifier.getSequenceNumber());
-    ByteArrayOutputStream os = new ByteArrayOutputStream();
-    DataOutputStream fsOut = new DataOutputStream(os);
+    ByteArrayOutputStream tokenOs = new ByteArrayOutputStream();
+    DataOutputStream tokenOut = new DataOutputStream(tokenOs);
+    ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
+    DataOutputStream seqOut = new DataOutputStream(seqOs);
+
     try {
-      rmDTIdentifier.write(fsOut);
-      fsOut.writeLong(renewDate);
+      rmDTIdentifier.write(tokenOut);
+      tokenOut.writeLong(renewDate);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Storing RMDelegationToken_" +
             rmDTIdentifier.getSequenceNumber());
       }
-      opList.add(Op.create(nodeCreatePath, os.toByteArray(), zkAcl,
+
+      opList.add(Op.create(nodeCreatePath, tokenOs.toByteArray(), zkAcl,
           CreateMode.PERSISTENT));
-    } finally {
-      os.close();
-    }
 
-    // store sequence number
-    String latestSequenceNumberPath =
-        getNodePath(rmDTSecretManagerRoot,
-            DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX +
-          latestSequenceNumber);
-    }
 
-    if (dtSequenceNumberPath != null) {
-      opList.add(Op.delete(dtSequenceNumberPath, -1));
+     seqOut.writeInt(latestSequenceNumber);
+     if (LOG.isDebugEnabled()) {
+        LOG.debug("Storing " + dtSequenceNumberPath +
+            ". SequenceNumber: " + latestSequenceNumber);
+      }
+
+     opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1));
+    } finally {
+      tokenOs.close();
+      seqOs.close();
     }
-    opList.add(Op.create(latestSequenceNumberPath, null, zkAcl,
-        CreateMode.PERSISTENT));
-    dtSequenceNumberPath = latestSequenceNumberPath;
+
     doMultiWithRetries(opList);
   }
 
@@ -585,7 +670,7 @@ public class ZKRMStateStore extends RMSt
   protected synchronized void removeRMDelegationTokenState(
       RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
     String nodeRemovePath =
-        getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX
+        getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
             + rmDTIdentifier.getSequenceNumber());
     if (LOG.isDebugEnabled()) {
       LOG.debug("Removing RMDelegationToken_"
@@ -598,7 +683,7 @@ public class ZKRMStateStore extends RMSt
   protected synchronized void storeRMDTMasterKeyState(
       DelegationKey delegationKey) throws Exception {
     String nodeCreatePath =
-        getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX
+        getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
             + delegationKey.getKeyId());
     ByteArrayOutputStream os = new ByteArrayOutputStream();
     DataOutputStream fsOut = new DataOutputStream(os);
@@ -618,7 +703,7 @@ public class ZKRMStateStore extends RMSt
   protected synchronized void removeRMDTMasterKeyState(
       DelegationKey delegationKey) throws Exception {
     String nodeRemovePath =
-        getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX
+        getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
             + delegationKey.getKeyId());
     if (LOG.isDebugEnabled()) {
       LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
@@ -757,8 +842,7 @@ public class ZKRMStateStore extends RMSt
     return new ZKAction<byte[]>() {
       @Override
       public byte[] run() throws KeeperException, InterruptedException {
-        Stat stat = new Stat();
-        return zkClient.getData(path, watch, stat);
+        return zkClient.getData(path, watch, null);
       }
     }.runWithRetries();
   }
@@ -865,4 +949,5 @@ public class ZKRMStateStore extends RMSt
     zk.register(new ForwardingWatcher());
     return zk;
   }
+
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1552210&r1=1552209&r2=1552210&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
Thu Dec 19 02:33:33 2013
@@ -683,14 +683,14 @@ public class TestRMRestart {
     MemoryRMStateStore memStore = new MemoryRMStateStore() {
       @Override
       public synchronized void storeApplicationAttemptStateInternal(
-          String attemptIdStr,
+          ApplicationAttemptId attemptId,
           ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
         // ignore attempt saving request.
       }
 
       @Override
       public synchronized void updateApplicationAttemptStateInternal(
-          String attemptIdStr,
+          ApplicationAttemptId attemptId,
           ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
         // ignore attempt saving request.
       }
@@ -1540,7 +1540,7 @@ public class TestRMRestart {
     public int updateAttempt = 0;
 
     @Override
-    public void updateApplicationStateInternal(String appId,
+    public void updateApplicationStateInternal(ApplicationId appId,
         ApplicationStateDataPBImpl appStateData) throws Exception {
       updateApp = ++count;
       super.updateApplicationStateInternal(appId, appStateData);
@@ -1548,11 +1548,12 @@ public class TestRMRestart {
 
     @Override
     public synchronized void
-        updateApplicationAttemptStateInternal(String attemptIdStr,
+        updateApplicationAttemptStateInternal(
+            ApplicationAttemptId attemptId,
             ApplicationAttemptStateDataPBImpl attemptStateData)
             throws Exception {
       updateAttempt = ++count;
-      super.updateApplicationAttemptStateInternal(attemptIdStr,
+      super.updateApplicationAttemptStateInternal(attemptId,
         attemptStateData);
     }
   }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java?rev=1552210&r1=1552209&r2=1552210&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
Thu Dec 19 02:33:33 2013
@@ -234,6 +234,12 @@ public class RMStateStoreTestBase extend
     attempts.put(attemptIdRemoved, mockRemovedAttempt);
     store.removeApplication(mockRemovedApp);
 
+    // remove application directory recursively.
+    storeApp(store, appIdRemoved, submitTime, startTime);
+    storeAttempt(store, attemptIdRemoved,
+        "container_1352994193343_0002_01_000001", null, null, dispatcher);
+    store.removeApplication(mockRemovedApp);
+
     // let things settle down
     Thread.sleep(1000);
     store.close();
@@ -373,7 +379,30 @@ public class RMStateStoreTestBase extend
     Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
     Assert.assertEquals(sequenceNumber,
         secretManagerState.getDTSequenceNumber());
+
+    // check to delete delegationKey
+    store.removeRMDTMasterKey(key);
+    keySet.clear();
+    RMDTSecretManagerState noKeySecretManagerState =
+        store.loadState().getRMDTSecretManagerState();
+    Assert.assertEquals(token1, noKeySecretManagerState.getTokenState());
+    Assert.assertEquals(keySet, noKeySecretManagerState.getMasterKeyState());
+    Assert.assertEquals(sequenceNumber,
+        noKeySecretManagerState.getDTSequenceNumber());
+
+    // check to delete delegationToken
+    store.removeRMDelegationToken(dtId1, sequenceNumber);
+    RMDTSecretManagerState noKeyAndTokenSecretManagerState =
+        store.loadState().getRMDTSecretManagerState();
+    token1.clear();
+    Assert.assertEquals(token1,
+        noKeyAndTokenSecretManagerState.getTokenState());
+    Assert.assertEquals(keySet,
+        noKeyAndTokenSecretManagerState.getMasterKeyState());
+    Assert.assertEquals(sequenceNumber,
+        noKeySecretManagerState.getDTSequenceNumber());
     store.close();
+
   }
 
   private Token<AMRMTokenIdentifier> generateAMRMToken(

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java?rev=1552210&r1=1552209&r2=1552210&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
Thu Dec 19 02:33:33 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@@ -178,10 +179,11 @@ public class TestFSRMStateStore extends 
         @Override
         public void run() {
           try {
-            store.storeApplicationStateInternal("application1",
-              (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
-                .newApplicationStateData(111, 111, "user", null,
-                  RMAppState.ACCEPTED, "diagnostics", 333));
+            store.storeApplicationStateInternal(
+                ApplicationId.newInstance(100L, 1),
+                (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
+                    .newApplicationStateData(111, 111, "user", null,
+                        RMAppState.ACCEPTED, "diagnostics", 333));
           } catch (Exception e) {
             // TODO 0 datanode exception will not be retried by dfs client, fix
             // that separately.



Mime
View raw message