hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1555021 [3/7] - in /hadoop/common/branches/HDFS-5535/hadoop-yarn-project: ./ hadoop-yarn/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/j...
Date Fri, 03 Jan 2014 07:27:13 GMT
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java Fri Jan  3 07:26:52 2014
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.re
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -35,8 +37,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -54,6 +56,10 @@ public class RMContextImpl implements RM
   private final ConcurrentMap<String, RMNode> inactiveNodes
     = new ConcurrentHashMap<String, RMNode>();
 
+  private boolean isHAEnabled;
+  private HAServiceState haServiceState =
+      HAServiceProtocol.HAServiceState.INITIALIZING;
+  
   private AMLivelinessMonitor amLivelinessMonitor;
   private AMLivelinessMonitor amFinishingMonitor;
   private RMStateStore stateStore = null;
@@ -211,6 +217,16 @@ public class RMContextImpl implements RM
     return resourceTrackerService;
   }
 
+  void setHAEnabled(boolean isHAEnabled) {
+    this.isHAEnabled = isHAEnabled;
+  }
+
+  void setHAServiceState(HAServiceState haServiceState) {
+    synchronized (haServiceState) {
+      this.haServiceState = haServiceState;
+    }
+  }
+
   void setDispatcher(Dispatcher dispatcher) {
     this.rmDispatcher = dispatcher;
   }
@@ -290,4 +306,16 @@ public class RMContextImpl implements RM
       ResourceTrackerService resourceTrackerService) {
     this.resourceTrackerService = resourceTrackerService;
   }
+
+  @Override
+  public boolean isHAEnabled() {
+    return isHAEnabled;
+  }
+
+  @Override
+  public HAServiceState getHAServiceState() {
+    synchronized (haServiceState) {
+      return haServiceState;
+    }
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Fri Jan  3 07:26:52 2014
@@ -27,6 +27,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.http.HttpConfig.Policy;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.YarnUncaug
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -78,7 +81,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
@@ -131,13 +133,7 @@ public class ResourceManager extends Com
    * in Active state.
    */
   protected RMActiveServices activeServices;
-  protected ClientToAMTokenSecretManagerInRM clientToAMSecretManager;
-
-  protected RMContainerTokenSecretManager containerTokenSecretManager;
-  protected NMTokenSecretManagerInRM nmTokenSecretManager;
-
-  protected AMRMTokenSecretManager amRmTokenSecretManager;
-
+  protected RMSecretManagerService rmSecretManagerService;
   private Dispatcher rmDispatcher;
 
   protected ResourceScheduler scheduler;
@@ -151,7 +147,6 @@ public class ResourceManager extends Com
   protected RMAppManager rmAppManager;
   protected ApplicationACLsManager applicationACLsManager;
   protected QueueACLsManager queueACLsManager;
-  protected RMDelegationTokenSecretManager rmDTSecretManager;
   private DelegationTokenRenewer delegationTokenRenewer;
   private WebApp webApp;
   protected ResourceTrackerService resourceTracker;
@@ -188,6 +183,12 @@ public class ResourceManager extends Com
     addService(adminService);
     rmContext.setRMAdminService(adminService);
 
+    this.rmContext.setHAEnabled(HAUtil.isHAEnabled(conf));
+    if (this.rmContext.isHAEnabled()) {
+      HAUtil.verifyAndSetConfiguration(conf);
+    }
+    createAndInitActiveServices();
+
     super.serviceInit(conf);
   }
   
@@ -202,35 +203,19 @@ public class ResourceManager extends Com
     rmContext.setStateStore(rmStore);
   }
 
-  protected RMContainerTokenSecretManager createContainerTokenSecretManager(
-      Configuration conf) {
-    return new RMContainerTokenSecretManager(conf);
-  }
-
-  protected NMTokenSecretManagerInRM createNMTokenSecretManager(
-      Configuration conf) {
-    return new NMTokenSecretManagerInRM(conf);
-  }
-  
   protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
     return new SchedulerEventDispatcher(this.scheduler);
   }
 
   protected RMStateStoreOperationFailedEventDispatcher
-  createRMStateStoreOperationFailedEventDispatcher() {
-    return new RMStateStoreOperationFailedEventDispatcher(
-        rmContext.getRMAdminService());
+      createRMStateStoreOperationFailedEventDispatcher() {
+    return new RMStateStoreOperationFailedEventDispatcher(rmContext, this);
   }
 
   protected Dispatcher createDispatcher() {
     return new AsyncDispatcher();
   }
 
-  protected AMRMTokenSecretManager createAMRMTokenSecretManager(
-      Configuration conf) {
-    return new AMRMTokenSecretManager(conf);
-  }
-
   protected ResourceScheduler createScheduler() {
     String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER,
         YarnConfiguration.DEFAULT_RM_SCHEDULER);
@@ -316,11 +301,8 @@ public class ResourceManager extends Com
       addIfService(rmDispatcher);
       rmContext.setDispatcher(rmDispatcher);
 
-      clientToAMSecretManager = new ClientToAMTokenSecretManagerInRM();
-      rmContext.setClientToAMTokenSecretManager(clientToAMSecretManager);
-
-      amRmTokenSecretManager = createAMRMTokenSecretManager(conf);
-      rmContext.setAMRMTokenSecretManager(amRmTokenSecretManager);
+      rmSecretManagerService = createRMSecretManagerService();
+      addService(rmSecretManagerService);
 
       containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
       addService(containerAllocationExpirer);
@@ -334,12 +316,6 @@ public class ResourceManager extends Com
       addService(amFinishingMonitor);
       rmContext.setAMFinishingMonitor(amFinishingMonitor);
 
-      containerTokenSecretManager = createContainerTokenSecretManager(conf);
-      rmContext.setContainerTokenSecretManager(containerTokenSecretManager);
-
-      nmTokenSecretManager = createNMTokenSecretManager(conf);
-      rmContext.setNMTokenSecretManager(nmTokenSecretManager);
-
       boolean isRecoveryEnabled = conf.getBoolean(
           YarnConfiguration.RECOVERY_ENABLED,
           YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
@@ -427,8 +403,6 @@ public class ResourceManager extends Com
       rmAppManager = createRMAppManager();
       // Register event handler for RMAppManagerEvents
       rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
-      rmDTSecretManager = createRMDelegationTokenSecretManager(rmContext);
-      rmContext.setRMDelegationTokenSecretManager(rmDTSecretManager);
 
       clientRM = createClientRMService();
       rmContext.setClientRMService(clientRM);
@@ -452,10 +426,6 @@ public class ResourceManager extends Com
 
     @Override
     protected void serviceStart() throws Exception {
-      amRmTokenSecretManager.start();
-      containerTokenSecretManager.start();
-      nmTokenSecretManager.start();
-
       RMStateStore rmStore = rmContext.getStateStore();
       // The state store needs to start irrespective of recoveryEnabled as apps
       // need events to move to further states.
@@ -473,13 +443,7 @@ public class ResourceManager extends Com
           throw e;
         }
       }
-
       startWepApp();
-      try {
-        rmDTSecretManager.startThreads();
-      } catch(IOException ie) {
-        throw new YarnRuntimeException("Failed to start secret manager threads", ie);
-      }
 
       if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
         int port = webApp.port();
@@ -494,19 +458,7 @@ public class ResourceManager extends Com
       if (webApp != null) {
         webApp.stop();
       }
-      if (rmDTSecretManager != null) {
-        rmDTSecretManager.stopThreads();
-      }
 
-      if (amRmTokenSecretManager != null) {
-        amRmTokenSecretManager.stop();
-      }
-      if (containerTokenSecretManager != null) {
-        containerTokenSecretManager.stop();
-      }
-      if(nmTokenSecretManager != null) {
-        nmTokenSecretManager.stop();
-      }
 
       DefaultMetricsSystem.shutdown();
 
@@ -655,11 +607,14 @@ public class ResourceManager extends Com
   @Private
   public static class RMStateStoreOperationFailedEventDispatcher implements
       EventHandler<RMStateStoreOperationFailedEvent> {
-    private final AdminService adminService;
 
-    public RMStateStoreOperationFailedEventDispatcher(
-        AdminService adminService) {
-      this.adminService = adminService;
+    private final RMContext rmContext;
+    private final ResourceManager rm;
+
+    public RMStateStoreOperationFailedEventDispatcher(RMContext rmContext,
+        ResourceManager resourceManager) {
+      this.rmContext = rmContext;
+      this.rm = resourceManager;
     }
 
     @Override
@@ -671,16 +626,14 @@ public class ResourceManager extends Com
       }
       if (event.getType() == RMStateStoreOperationFailedEventType.FENCED) {
         LOG.info("RMStateStore has been fenced");
-        synchronized(adminService) {
-          if (adminService.haEnabled) {
-            try {
-              // Transition to standby and reinit active services
-              LOG.info("Transitioning RM to Standby mode");
-              adminService.transitionToStandby(true);
-              return;
-            } catch (Exception e) {
-              LOG.error("Failed to transition RM to Standby mode.");
-            }
+        if (rmContext.isHAEnabled()) {
+          try {
+            // Transition to standby and reinit active services
+            LOG.info("Transitioning RM to Standby mode");
+            rm.transitionToStandby(true);
+            return;
+          } catch (Exception e) {
+            LOG.error("Failed to transition RM to Standby mode.");
           }
         }
       }
@@ -826,10 +779,6 @@ public class ResourceManager extends Com
     webApp = builder.start(new RMWebApp(this));
   }
 
-  void setConf(Configuration configuration) {
-    conf = configuration;
-  }
-
   /**
    * Helper method to create and init {@link #activeServices}. This creates an
    * instance of {@link RMActiveServices} and initializes it.
@@ -870,6 +819,39 @@ public class ResourceManager extends Com
     return activeServices != null && activeServices.isInState(STATE.STARTED);
   }
 
+  synchronized void transitionToActive() throws Exception {
+    if (rmContext.getHAServiceState() ==
+        HAServiceProtocol.HAServiceState.ACTIVE) {
+      LOG.info("Already in active state");
+      return;
+    }
+
+    LOG.info("Transitioning to active state");
+    startActiveServices();
+    rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE);
+    LOG.info("Transitioned to active state");
+  }
+
+  synchronized void transitionToStandby(boolean initialize)
+      throws Exception {
+    if (rmContext.getHAServiceState() ==
+        HAServiceProtocol.HAServiceState.STANDBY) {
+      LOG.info("Already in standby state");
+      return;
+    }
+
+    LOG.info("Transitioning to standby state");
+    if (rmContext.getHAServiceState() ==
+        HAServiceProtocol.HAServiceState.ACTIVE) {
+      stopActiveServices();
+      if (initialize) {
+        createAndInitActiveServices();
+      }
+    }
+    rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
+    LOG.info("Transitioned to standby state");
+  }
+
   @Override
   protected void serviceStart() throws Exception {
     try {
@@ -877,6 +859,13 @@ public class ResourceManager extends Com
     } catch(IOException ie) {
       throw new YarnRuntimeException("Failed to login", ie);
     }
+
+    if (this.rmContext.isHAEnabled()) {
+      transitionToStandby(true);
+    } else {
+      transitionToActive();
+    }
+
     super.serviceStart();
   }
   
@@ -888,34 +877,21 @@ public class ResourceManager extends Com
   @Override
   protected void serviceStop() throws Exception {
     super.serviceStop();
+    transitionToStandby(false);
+    rmContext.setHAServiceState(HAServiceState.STOPPING);
   }
   
   protected ResourceTrackerService createResourceTrackerService() {
     return new ResourceTrackerService(this.rmContext, this.nodesListManager,
-        this.nmLivelinessMonitor, this.containerTokenSecretManager,
-        this.nmTokenSecretManager);
-  }
-
-  protected RMDelegationTokenSecretManager
-               createRMDelegationTokenSecretManager(RMContext rmContext) {
-    long secretKeyInterval = 
-        conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY, 
-            YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
-    long tokenMaxLifetime =
-        conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY,
-            YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
-    long tokenRenewInterval =
-        conf.getLong(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_KEY, 
-            YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
-
-    return new RMDelegationTokenSecretManager(secretKeyInterval, 
-        tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext);
+        this.nmLivelinessMonitor,
+        this.rmContext.getContainerTokenSecretManager(),
+        this.rmContext.getNMTokenSecretManager());
   }
 
   protected ClientRMService createClientRMService() {
     return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
         this.applicationACLsManager, this.queueACLsManager,
-        this.rmDTSecretManager);
+        getRMDTSecretManager());
   }
 
   protected ApplicationMasterService createApplicationMasterService() {
@@ -926,6 +902,10 @@ public class ResourceManager extends Com
     return new AdminService(this, rmContext);
   }
 
+  protected RMSecretManagerService createRMSecretManagerService() {
+    return new RMSecretManagerService(conf, rmContext);
+  }
+
   @Private
   public ClientRMService getClientRMService() {
     return this.clientRM;
@@ -966,23 +946,28 @@ public class ResourceManager extends Com
 
   @Private
   public RMContainerTokenSecretManager getRMContainerTokenSecretManager() {
-    return this.containerTokenSecretManager;
+    return this.rmContext.getContainerTokenSecretManager();
   }
 
   @Private
   public NMTokenSecretManagerInRM getRMNMTokenSecretManager() {
-    return this.nmTokenSecretManager;
+    return this.rmContext.getNMTokenSecretManager();
   }
   
   @Private
   public AMRMTokenSecretManager getAMRMTokenSecretManager(){
-    return this.amRmTokenSecretManager;
+    return this.rmContext.getAMRMTokenSecretManager();
+  }
+
+  @Private
+  public RMDelegationTokenSecretManager getRMDTSecretManager(){
+    return this.rmContext.getRMDelegationTokenSecretManager();
   }
 
   @Override
   public void recover(RMState state) throws Exception {
     // recover RMdelegationTokenSecretManager
-    rmDTSecretManager.recover(state);
+    getRMDTSecretManager().recover(state);
 
     // recover applications
     rmAppManager.recover(state);

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Fri Jan  3 07:26:52 2014
@@ -287,11 +287,12 @@ public class FileSystemRMStateStore exte
   }
 
   @Override
-  public synchronized void storeApplicationStateInternal(String appId,
+  public synchronized void storeApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateDataPB) throws Exception {
-    Path appDirPath = getAppDir(rmAppRoot, appId);
+    String appIdStr = appId.toString();
+    Path appDirPath = getAppDir(rmAppRoot, appIdStr);
     fs.mkdirs(appDirPath);
-    Path nodeCreatePath = getNodePath(appDirPath, appId);
+    Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
 
     LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
     byte[] appStateData = appStateDataPB.getProto().toByteArray();
@@ -306,10 +307,11 @@ public class FileSystemRMStateStore exte
   }
 
   @Override
-  public synchronized void updateApplicationStateInternal(String appId,
+  public synchronized void updateApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateDataPB) throws Exception {
-    Path appDirPath = getAppDir(rmAppRoot, appId);
-    Path nodeCreatePath = getNodePath(appDirPath, appId);
+    String appIdStr = appId.toString();
+    Path appDirPath = getAppDir(rmAppRoot, appIdStr);
+    Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
 
     LOG.info("Updating info for app: " + appId + " at: " + nodeCreatePath);
     byte[] appStateData = appStateDataPB.getProto().toByteArray();
@@ -325,14 +327,13 @@ public class FileSystemRMStateStore exte
 
   @Override
   public synchronized void storeApplicationAttemptStateInternal(
-      String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptId appAttemptId,
+      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
       throws Exception {
-    ApplicationAttemptId appAttemptId =
-        ConverterUtils.toApplicationAttemptId(attemptId);
     Path appDirPath =
         getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
-    Path nodeCreatePath = getNodePath(appDirPath, attemptId);
-    LOG.info("Storing info for attempt: " + attemptId + " at: "
+    Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
+    LOG.info("Storing info for attempt: " + appAttemptId + " at: "
         + nodeCreatePath);
     byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
     try {
@@ -340,21 +341,20 @@ public class FileSystemRMStateStore exte
       // based on whether we have lost the right to write to FS
       writeFile(nodeCreatePath, attemptStateData);
     } catch (Exception e) {
-      LOG.info("Error storing info for attempt: " + attemptId, e);
+      LOG.info("Error storing info for attempt: " + appAttemptId, e);
       throw e;
     }
   }
 
   @Override
   public synchronized void updateApplicationAttemptStateInternal(
-      String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptId appAttemptId,
+      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
       throws Exception {
-    ApplicationAttemptId appAttemptId =
-        ConverterUtils.toApplicationAttemptId(attemptId);
     Path appDirPath =
         getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
-    Path nodeCreatePath = getNodePath(appDirPath, attemptId);
-    LOG.info("Updating info for attempt: " + attemptId + " at: "
+    Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
+    LOG.info("Updating info for attempt: " + appAttemptId + " at: "
         + nodeCreatePath);
     byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
     try {
@@ -362,7 +362,7 @@ public class FileSystemRMStateStore exte
       // based on whether we have lost the right to write to FS
       updateFile(nodeCreatePath, attemptStateData);
     } catch (Exception e) {
-      LOG.info("Error updating info for attempt: " + attemptId, e);
+      LOG.info("Error updating info for attempt: " + appAttemptId, e);
       throw e;
     }
   }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Fri Jan  3 07:26:52 2014
@@ -80,7 +80,7 @@ public class MemoryRMStateStore extends 
   }
 
   @Override
-  public void storeApplicationStateInternal(String appId, 
+  public void storeApplicationStateInternal(ApplicationId appId,
                                      ApplicationStateDataPBImpl appStateData)
       throws Exception {
     ApplicationState appState =
@@ -88,11 +88,11 @@ public class MemoryRMStateStore extends 
           appStateData.getStartTime(),
           appStateData.getApplicationSubmissionContext(),
           appStateData.getUser());
-    state.appState.put(appState.getAppId(), appState);
+    state.appState.put(appId, appState);
   }
 
   @Override
-  public void updateApplicationStateInternal(String appId,
+  public void updateApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateData) throws Exception {
     ApplicationState updatedAppState =
         new ApplicationState(appStateData.getSubmitTime(),
@@ -102,21 +102,19 @@ public class MemoryRMStateStore extends 
           appStateData.getDiagnostics(), appStateData.getFinishTime());
     LOG.info("Updating final state " + appStateData.getState() + " for app: "
         + appId);
-    ApplicationId applicationId = updatedAppState.getAppId();
-    if (state.appState.get(applicationId) != null) {
+    if (state.appState.get(appId) != null) {
       // add the earlier attempts back
       updatedAppState.attempts
-        .putAll(state.appState.get(applicationId).attempts);
+        .putAll(state.appState.get(appId).attempts);
     }
-    state.appState.put(applicationId, updatedAppState);
+    state.appState.put(appId, updatedAppState);
   }
 
   @Override
-  public synchronized void storeApplicationAttemptStateInternal(String attemptIdStr, 
-                            ApplicationAttemptStateDataPBImpl attemptStateData)
-                            throws Exception {
-    ApplicationAttemptId attemptId = ConverterUtils
-                                        .toApplicationAttemptId(attemptIdStr);
+  public synchronized void storeApplicationAttemptStateInternal(
+      ApplicationAttemptId appAttemptId,
+      ApplicationAttemptStateDataPBImpl attemptStateData)
+      throws Exception {
     Credentials credentials = null;
     if(attemptStateData.getAppAttemptTokens() != null){
       DataInputByteBuffer dibb = new DataInputByteBuffer();
@@ -125,7 +123,7 @@ public class MemoryRMStateStore extends 
       credentials.readTokenStorageStream(dibb);
     }
     ApplicationAttemptState attemptState =
-        new ApplicationAttemptState(attemptId,
+        new ApplicationAttemptState(appAttemptId,
           attemptStateData.getMasterContainer(), credentials,
           attemptStateData.getStartTime());
 
@@ -139,10 +137,9 @@ public class MemoryRMStateStore extends 
 
   @Override
   public synchronized void updateApplicationAttemptStateInternal(
-      String attemptIdStr, ApplicationAttemptStateDataPBImpl attemptStateData)
+      ApplicationAttemptId appAttemptId,
+      ApplicationAttemptStateDataPBImpl attemptStateData)
       throws Exception {
-    ApplicationAttemptId attemptId =
-        ConverterUtils.toApplicationAttemptId(attemptIdStr);
     Credentials credentials = null;
     if (attemptStateData.getAppAttemptTokens() != null) {
       DataInputByteBuffer dibb = new DataInputByteBuffer();
@@ -151,7 +148,7 @@ public class MemoryRMStateStore extends 
       credentials.readTokenStorageStream(dibb);
     }
     ApplicationAttemptState updatedAttemptState =
-        new ApplicationAttemptState(attemptId,
+        new ApplicationAttemptState(appAttemptId,
           attemptStateData.getMasterContainer(), credentials,
           attemptStateData.getStartTime(), attemptStateData.getState(),
           attemptStateData.getFinalTrackingUrl(),

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java Fri Jan  3 07:26:52 2014
@@ -22,6 +22,8 @@ package org.apache.hadoop.yarn.server.re
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
@@ -51,13 +53,13 @@ public class NullRMStateStore extends RM
   }
 
   @Override
-  protected void storeApplicationStateInternal(String appId,
+  protected void storeApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateData) throws Exception {
     // Do nothing
   }
 
   @Override
-  protected void storeApplicationAttemptStateInternal(String attemptId,
+  protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
       ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
     // Do nothing
   }
@@ -92,13 +94,13 @@ public class NullRMStateStore extends RM
   }
 
   @Override
-  protected void updateApplicationStateInternal(String appId,
+  protected void updateApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateData) throws Exception {
     // Do nothing 
   }
 
   @Override
-  protected void updateApplicationAttemptStateInternal(String attemptId,
+  protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
       ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
   }
 

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Fri Jan  3 07:26:52 2014
@@ -387,10 +387,10 @@ public abstract class RMStateStore exten
    * Derived classes must implement this method to store the state of an 
    * application.
    */
-  protected abstract void storeApplicationStateInternal(String appId,
+  protected abstract void storeApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateData) throws Exception;
 
-  protected abstract void updateApplicationStateInternal(String appId,
+  protected abstract void updateApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateData) throws Exception;
   
   @SuppressWarnings("unchecked")
@@ -424,10 +424,12 @@ public abstract class RMStateStore exten
    * Derived classes must implement this method to store the state of an 
    * application attempt
    */
-  protected abstract void storeApplicationAttemptStateInternal(String attemptId,
+  protected abstract void storeApplicationAttemptStateInternal(
+      ApplicationAttemptId attemptId,
       ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
 
-  protected abstract void updateApplicationAttemptStateInternal(String attemptId,
+  protected abstract void updateApplicationAttemptStateInternal(
+      ApplicationAttemptId attemptId,
       ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
 
   /**
@@ -592,11 +594,11 @@ public abstract class RMStateStore exten
       LOG.info("Storing info for app: " + appId);
       try {
         if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
-          storeApplicationStateInternal(appId.toString(), appStateData);
+          storeApplicationStateInternal(appId, appStateData);
           notifyDoneStoringApplication(appId, storedException);
         } else {
           assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
-          updateApplicationStateInternal(appId.toString(), appStateData);
+          updateApplicationStateInternal(appId, appStateData);
           notifyDoneUpdatingApplication(appId, storedException);
         }
       } catch (Exception e) {
@@ -637,15 +639,15 @@ public abstract class RMStateStore exten
           LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
         }
         if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
-          storeApplicationAttemptStateInternal(attemptState.getAttemptId()
-            .toString(), attemptStateData);
+          storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
+              attemptStateData);
           notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
               storedException);
         } else {
           assert event.getType().equals(
             RMStateStoreEventType.UPDATE_APP_ATTEMPT);
-          updateApplicationAttemptStateInternal(attemptState.getAttemptId()
-            .toString(), attemptStateData);
+          updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
+              attemptStateData);
           notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
               storedException);
         }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Fri Jan  3 07:26:52 2014
@@ -78,16 +78,51 @@ public class ZKRMStateStore extends RMSt
   protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
   protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
       .newInstance(1, 0);
+  private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
+      "RMDelegationTokensRoot";
+  private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
+      "RMDTSequentialNumber";
+  private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
+      "RMDTMasterKeysRoot";
   private int numRetries;
 
   private String zkHostPort = null;
   private int zkSessionTimeout;
   private long zkRetryInterval;
   private List<ACL> zkAcl;
+
+  /**
+   *
+   * ROOT_DIR_PATH
+   * |--- VERSION_INFO
+   * |--- RM_ZK_FENCING_LOCK
+   * |--- RM_APP_ROOT
+   * |     |----- (#ApplicationId1)
+   * |     |        |----- (#ApplicationAttemptIds)
+   * |     |
+   * |     |----- (#ApplicationId2)
+   * |     |       |----- (#ApplicationAttemptIds)
+   * |     ....
+   * |
+   * |--- RM_DT_SECRET_MANAGER_ROOT
+   *        |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
+   *        |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
+   *        |       |----- Token_1
+   *        |       |----- Token_2
+   *        |       ....
+   *        |
+   *        |----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME
+   *        |      |----- Key_1
+   *        |      |----- Key_2
+   *                ....
+   *
+   */
   private String zkRootNodePath;
-  private String rmDTSecretManagerRoot;
   private String rmAppRoot;
-  private String dtSequenceNumberPath = null;
+  private String rmDTSecretManagerRoot;
+  private String dtMasterKeysRootPath;
+  private String delegationTokensRootPath;
+  private String dtSequenceNumberPath;
 
   @VisibleForTesting
   protected String znodeWorkingPath;
@@ -178,12 +213,11 @@ public class ZKRMStateStore extends RMSt
       throw bafe;
     }
 
-    zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME;
-    rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT;
-    rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT;
+    zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
+    rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
 
     /* Initialize fencing related paths, acls, and ops */
-    fencingNodePath = zkRootNodePath + "/" + FENCING_LOCK;
+    fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
     createFencingNodePathOp = Op.create(fencingNodePath, new byte[0], zkAcl,
         CreateMode.PERSISTENT);
     deleteFencingNodePathOp = Op.delete(fencingNodePath, -1);
@@ -204,6 +238,15 @@ public class ZKRMStateStore extends RMSt
         zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl);
       }
     }
+
+    rmDTSecretManagerRoot =
+        getNodePath(zkRootNodePath, RM_DT_SECRET_MANAGER_ROOT);
+    dtMasterKeysRootPath = getNodePath(rmDTSecretManagerRoot,
+        RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME);
+    delegationTokensRootPath = getNodePath(rmDTSecretManagerRoot,
+        RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
+    dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
+        RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
   }
 
   @Override
@@ -217,8 +260,11 @@ public class ZKRMStateStore extends RMSt
     if (HAUtil.isHAEnabled(getConfig())){
       fence();
     }
-    createRootDir(rmDTSecretManagerRoot);
     createRootDir(rmAppRoot);
+    createRootDir(rmDTSecretManagerRoot);
+    createRootDir(dtMasterKeysRootPath);
+    createRootDir(delegationTokensRootPath);
+    createRootDir(dtSequenceNumberPath);
   }
 
   private void createRootDir(final String rootPath) throws Exception {
@@ -350,26 +396,69 @@ public class ZKRMStateStore extends RMSt
 
   private synchronized void loadRMDTSecretManagerState(RMState rmState)
       throws Exception {
-    List<String> childNodes =
-        getChildrenWithRetries(rmDTSecretManagerRoot, true);
+    loadRMDelegationKeyState(rmState);
+    loadRMSequentialNumberState(rmState);
+    loadRMDelegationTokenState(rmState);
+  }
 
+  private void loadRMDelegationKeyState(RMState rmState) throws Exception {
+    List<String> childNodes =
+        getChildrenWithRetries(dtMasterKeysRootPath, true);
     for (String childNodeName : childNodes) {
-      if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
-        rmState.rmSecretManagerState.dtSequenceNumber =
-            Integer.parseInt(childNodeName.split("_")[1]);
+      String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName);
+      byte[] childData = getDataWithRetries(childNodePath, true);
+
+      if (childData == null) {
+        LOG.warn("Content of " + childNodePath + " is broken.");
         continue;
       }
-      String childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
-      byte[] childData = getDataWithRetries(childNodePath, true);
 
       ByteArrayInputStream is = new ByteArrayInputStream(childData);
       DataInputStream fsIn = new DataInputStream(is);
+
       try {
         if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) {
           DelegationKey key = new DelegationKey();
           key.readFields(fsIn);
           rmState.rmSecretManagerState.masterKeyState.add(key);
-        } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
+        }
+      } finally {
+        is.close();
+      }
+    }
+  }
+
+  private void loadRMSequentialNumberState(RMState rmState) throws Exception {
+    byte[] seqData = getDataWithRetries(dtSequenceNumberPath, false);
+    if (seqData != null) {
+      ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData);
+      DataInputStream seqIn = new DataInputStream(seqIs);
+
+      try {
+        rmState.rmSecretManagerState.dtSequenceNumber = seqIn.readInt();
+      } finally {
+        seqIn.close();
+      }
+    }
+  }
+
+  private void loadRMDelegationTokenState(RMState rmState) throws Exception {
+    List<String> childNodes = zkClient.getChildren(delegationTokensRootPath, true);
+    for (String childNodeName : childNodes) {
+      String childNodePath =
+          getNodePath(delegationTokensRootPath, childNodeName);
+      byte[] childData = getDataWithRetries(childNodePath, true);
+
+      if (childData == null) {
+        LOG.warn("Content of " + childNodePath + " is broken.");
+        continue;
+      }
+
+      ByteArrayInputStream is = new ByteArrayInputStream(childData);
+      DataInputStream fsIn = new DataInputStream(is);
+
+      try {
+        if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
           RMDelegationTokenIdentifier identifier =
               new RMDelegationTokenIdentifier();
           identifier.readFields(fsIn);
@@ -385,8 +474,6 @@ public class ZKRMStateStore extends RMSt
 
   private synchronized void loadRMAppState(RMState rmState) throws Exception {
     List<String> childNodes = getChildrenWithRetries(rmAppRoot, true);
-    List<ApplicationAttemptState> attempts =
-        new ArrayList<ApplicationAttemptState>();
     for (String childNodeName : childNodes) {
       String childNodePath = getNodePath(rmAppRoot, childNodeName);
       byte[] childData = getDataWithRetries(childNodePath, true);
@@ -411,17 +498,28 @@ public class ZKRMStateStore extends RMSt
               "from the application id");
         }
         rmState.appState.put(appId, appState);
-      } else if (childNodeName
-          .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
-        // attempt
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Loading application attempt from znode: " + childNodeName);
-        }
+        loadApplicationAttemptState(appState, appId);
+      } else {
+        LOG.info("Unknown child node with name: " + childNodeName);
+      }
+    }
+  }
+
+  private void loadApplicationAttemptState(ApplicationState appState,
+      ApplicationId appId)
+      throws Exception {
+    String appPath = getNodePath(rmAppRoot, appId.toString());
+    List<String> attempts = getChildrenWithRetries(appPath, false);
+    for (String attemptIDStr : attempts) {
+      if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
+        String attemptPath = getNodePath(appPath, attemptIDStr);
+        byte[] attemptData = getDataWithRetries(attemptPath, true);
+
         ApplicationAttemptId attemptId =
-            ConverterUtils.toApplicationAttemptId(childNodeName);
+            ConverterUtils.toApplicationAttemptId(attemptIDStr);
         ApplicationAttemptStateDataPBImpl attemptStateData =
             new ApplicationAttemptStateDataPBImpl(
-                ApplicationAttemptStateDataProto.parseFrom(childData));
+                ApplicationAttemptStateDataProto.parseFrom(attemptData));
         Credentials credentials = null;
         if (attemptStateData.getAppAttemptTokens() != null) {
           credentials = new Credentials();
@@ -429,47 +527,26 @@ public class ZKRMStateStore extends RMSt
           dibb.reset(attemptStateData.getAppAttemptTokens());
           credentials.readTokenStorageStream(dibb);
         }
+
         ApplicationAttemptState attemptState =
             new ApplicationAttemptState(attemptId,
-              attemptStateData.getMasterContainer(), credentials,
-              attemptStateData.getStartTime(),
-              attemptStateData.getState(),
-              attemptStateData.getFinalTrackingUrl(),
-              attemptStateData.getDiagnostics(),
-              attemptStateData.getFinalApplicationStatus());
-        if (!attemptId.equals(attemptState.getAttemptId())) {
-          throw new YarnRuntimeException("The child node name is different " +
-              "from the application attempt id");
-        }
-        attempts.add(attemptState);
-      } else {
-        LOG.info("Unknown child node with name: " + childNodeName);
-      }
-    }
+                attemptStateData.getMasterContainer(), credentials,
+                attemptStateData.getStartTime(),
+                attemptStateData.getState(),
+                attemptStateData.getFinalTrackingUrl(),
+                attemptStateData.getDiagnostics(),
+                attemptStateData.getFinalApplicationStatus());
 
-    // go through all attempts and add them to their apps
-    for (ApplicationAttemptState attemptState : attempts) {
-      ApplicationId appId = attemptState.getAttemptId().getApplicationId();
-      ApplicationState appState = rmState.appState.get(appId);
-      if (appState != null) {
         appState.attempts.put(attemptState.getAttemptId(), attemptState);
-      } else {
-        // the application znode may have been removed when the application
-        // completed but the RM might have stopped before it could remove the
-        // application attempt znodes
-        LOG.info("Application node not found for attempt: "
-            + attemptState.getAttemptId());
-        deleteWithRetries(
-            getNodePath(rmAppRoot, attemptState.getAttemptId().toString()), -1);
       }
     }
     LOG.info("Done Loading applications from ZK state store");
   }
 
   @Override
-  public synchronized void storeApplicationStateInternal(String appId,
+  public synchronized void storeApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateDataPB) throws Exception {
-    String nodeCreatePath = getNodePath(rmAppRoot, appId);
+    String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
@@ -481,25 +558,29 @@ public class ZKRMStateStore extends RMSt
   }
 
   @Override
-  public synchronized void updateApplicationStateInternal(String appId,
+  public synchronized void updateApplicationStateInternal(ApplicationId appId,
       ApplicationStateDataPBImpl appStateDataPB) throws Exception {
-    String nodeCreatePath = getNodePath(rmAppRoot, appId);
+    String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Storing final state info for app: " + appId + " at: "
-          + nodeCreatePath);
+          + nodeUpdatePath);
     }
     byte[] appStateData = appStateDataPB.getProto().toByteArray();
-    setDataWithRetries(nodeCreatePath, appStateData, 0);
+    setDataWithRetries(nodeUpdatePath, appStateData, 0);
   }
 
   @Override
   public synchronized void storeApplicationAttemptStateInternal(
-      String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptId appAttemptId,
+      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
       throws Exception {
-    String nodeCreatePath = getNodePath(rmAppRoot, attemptId);
+    String appDirPath = getNodePath(rmAppRoot,
+        appAttemptId.getApplicationId().toString());
+    String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Storing info for attempt: " + attemptId + " at: "
+      LOG.debug("Storing info for attempt: " + appAttemptId + " at: "
           + nodeCreatePath);
     }
     byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
@@ -509,31 +590,36 @@ public class ZKRMStateStore extends RMSt
 
   @Override
   public synchronized void updateApplicationAttemptStateInternal(
-      String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptId appAttemptId,
+      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
       throws Exception {
-    String nodeCreatePath = getNodePath(rmAppRoot, attemptId);
+    String appIdStr = appAttemptId.getApplicationId().toString();
+    String appAttemptIdStr = appAttemptId.toString();
+    String appDirPath = getNodePath(rmAppRoot, appIdStr);
+    String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Storing final state info for attempt: " + attemptId + " at: "
-          + nodeCreatePath);
+      LOG.debug("Storing final state info for attempt: " + appAttemptIdStr
+          + " at: " + nodeUpdatePath);
     }
     byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
-    setDataWithRetries(nodeCreatePath, attemptStateData, 0);
+    setDataWithRetries(nodeUpdatePath, attemptStateData, 0);
   }
 
   @Override
   public synchronized void removeApplicationStateInternal(ApplicationState appState)
       throws Exception {
     String appId = appState.getAppId().toString();
-    String nodeRemovePath = getNodePath(rmAppRoot, appId);
+    String appIdRemovePath = getNodePath(rmAppRoot, appId);
     ArrayList<Op> opList = new ArrayList<Op>();
-    opList.add(Op.delete(nodeRemovePath, -1));
 
     for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
-      String attemptRemovePath = getNodePath(rmAppRoot, attemptId.toString());
+      String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString());
       opList.add(Op.delete(attemptRemovePath, -1));
     }
+    opList.add(Op.delete(appIdRemovePath, -1));
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Removing info for app: " + appId + " at: " + nodeRemovePath
+      LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
           + " and its attempts.");
     }
     doMultiWithRetries(opList);
@@ -546,38 +632,37 @@ public class ZKRMStateStore extends RMSt
     ArrayList<Op> opList = new ArrayList<Op>();
     // store RM delegation token
     String nodeCreatePath =
-        getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX
+        getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
             + rmDTIdentifier.getSequenceNumber());
-    ByteArrayOutputStream os = new ByteArrayOutputStream();
-    DataOutputStream fsOut = new DataOutputStream(os);
+    ByteArrayOutputStream tokenOs = new ByteArrayOutputStream();
+    DataOutputStream tokenOut = new DataOutputStream(tokenOs);
+    ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
+    DataOutputStream seqOut = new DataOutputStream(seqOs);
+
     try {
-      rmDTIdentifier.write(fsOut);
-      fsOut.writeLong(renewDate);
+      rmDTIdentifier.write(tokenOut);
+      tokenOut.writeLong(renewDate);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Storing RMDelegationToken_" +
             rmDTIdentifier.getSequenceNumber());
       }
-      opList.add(Op.create(nodeCreatePath, os.toByteArray(), zkAcl,
+
+      opList.add(Op.create(nodeCreatePath, tokenOs.toByteArray(), zkAcl,
           CreateMode.PERSISTENT));
-    } finally {
-      os.close();
-    }
 
-    // store sequence number
-    String latestSequenceNumberPath =
-        getNodePath(rmDTSecretManagerRoot,
-            DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX +
-          latestSequenceNumber);
-    }
 
-    if (dtSequenceNumberPath != null) {
-      opList.add(Op.delete(dtSequenceNumberPath, -1));
+     seqOut.writeInt(latestSequenceNumber);
+     if (LOG.isDebugEnabled()) {
+        LOG.debug("Storing " + dtSequenceNumberPath +
+            ". SequenceNumber: " + latestSequenceNumber);
+      }
+
+     opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1));
+    } finally {
+      tokenOs.close();
+      seqOs.close();
     }
-    opList.add(Op.create(latestSequenceNumberPath, null, zkAcl,
-        CreateMode.PERSISTENT));
-    dtSequenceNumberPath = latestSequenceNumberPath;
+
     doMultiWithRetries(opList);
   }
 
@@ -585,7 +670,7 @@ public class ZKRMStateStore extends RMSt
   protected synchronized void removeRMDelegationTokenState(
       RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
     String nodeRemovePath =
-        getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX
+        getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
             + rmDTIdentifier.getSequenceNumber());
     if (LOG.isDebugEnabled()) {
       LOG.debug("Removing RMDelegationToken_"
@@ -598,7 +683,7 @@ public class ZKRMStateStore extends RMSt
   protected synchronized void storeRMDTMasterKeyState(
       DelegationKey delegationKey) throws Exception {
     String nodeCreatePath =
-        getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX
+        getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
             + delegationKey.getKeyId());
     ByteArrayOutputStream os = new ByteArrayOutputStream();
     DataOutputStream fsOut = new DataOutputStream(os);
@@ -618,7 +703,7 @@ public class ZKRMStateStore extends RMSt
   protected synchronized void removeRMDTMasterKeyState(
       DelegationKey delegationKey) throws Exception {
     String nodeRemovePath =
-        getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX
+        getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
             + delegationKey.getKeyId());
     if (LOG.isDebugEnabled()) {
       LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
@@ -757,8 +842,7 @@ public class ZKRMStateStore extends RMSt
     return new ZKAction<byte[]>() {
       @Override
       public byte[] run() throws KeeperException, InterruptedException {
-        Stat stat = new Stat();
-        return zkClient.getData(path, watch, stat);
+        return zkClient.getData(path, watch, null);
       }
     }.runWithRetries();
   }
@@ -865,4 +949,5 @@ public class ZKRMStateStore extends RMSt
     zk.register(new ForwardingWatcher());
     return zk;
   }
+
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Fri Jan  3 07:26:52 2014
@@ -197,13 +197,13 @@ public interface RMApp extends EventHand
   String getApplicationType(); 
 
   /**
-   * Check whether this application is safe to unregister.
-   * An application is deemed to be safe to unregister if it is an unmanaged
-   * AM or its state has been removed from state store.
+   * Check whether this application is safe to terminate.
+   * An application is deemed to be safe to terminate if it is an unmanaged
+   * AM or its state has been saved in state store.
    * @return the flag which indicates whether this application is safe to
-   *         unregister.
+   *         terminate.
    */
-  boolean isAppSafeToUnregister();
+  boolean isAppSafeToTerminate();
 
   /**
    * Create the external user-facing state of ApplicationMaster from the

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Fri Jan  3 07:26:52 2014
@@ -24,9 +24,11 @@ public enum RMAppEventType {
   RECOVER,
   KILL,
 
-  // Source: RMAppAttempt
+  // Source: Scheduler
   APP_REJECTED,
   APP_ACCEPTED,
+
+  // Source: RMAppAttempt
   ATTEMPT_REGISTERED,
   ATTEMPT_UNREGISTERED,
   ATTEMPT_FINISHED, // Will send the final state
@@ -37,5 +39,4 @@ public enum RMAppEventType {
   // Source: RMStateStore
   APP_NEW_SAVED,
   APP_UPDATE_SAVED,
-  APP_REMOVED
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Fri Jan  3 07:26:52 2014
@@ -66,6 +66,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -110,10 +112,14 @@ public class RMAppImpl implements RMApp,
   private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
   private static final AppFinishedTransition FINISHED_TRANSITION =
       new AppFinishedTransition();
+
+  // These states stored are only valid when app is at killing or final_saving.
+  private RMAppState stateBeforeKilling;
   private RMAppState stateBeforeFinalSaving;
   private RMAppEvent eventCausingFinalSaving;
   private RMAppState targetedFinalState;
   private RMAppState recoveredFinalState;
+
   Object transitionTodo;
 
   private static final StateMachineFactory<RMAppImpl,
@@ -132,7 +138,7 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
         RMAppEventType.START, new RMAppNewlySavingTransition())
     .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
-            RMAppState.RUNNING, RMAppState.FINISHED, RMAppState.FAILED,
+            RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,
             RMAppState.KILLED, RMAppState.FINAL_SAVING),
         RMAppEventType.RECOVER, new RMAppRecoveredTransition())
     .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.KILL,
@@ -147,7 +153,7 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
-        RMAppEventType.APP_NEW_SAVED, new StartAppAttemptTransition())
+        RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
     .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
         RMAppEventType.KILL,
         new FinalSavingTransition(
@@ -165,11 +171,12 @@ public class RMAppImpl implements RMApp,
         new FinalSavingTransition(
           new AppRejectedTransition(), RMAppState.FAILED))
     .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
-        RMAppEventType.APP_ACCEPTED)
+        RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
     .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
         RMAppEventType.KILL,
         new FinalSavingTransition(
-          new KillAppAndAttemptTransition(), RMAppState.KILLED))
+          new AppKilledTransition(), RMAppState.KILLED))
+
 
      // Transitions from ACCEPTED state
     .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
@@ -177,13 +184,22 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
         RMAppEventType.ATTEMPT_REGISTERED)
     .addTransition(RMAppState.ACCEPTED,
-        EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
+        EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
+        // ACCEPTED state is possible to receive ATTEMPT_FAILED event because
+        // RMAppRecoveredTransition is returning ACCEPTED state directly and
+        // waiting for the previous AM to exit.
         RMAppEventType.ATTEMPT_FAILED,
-        new AttemptFailedTransition(RMAppState.SUBMITTED))
+        new AttemptFailedTransition(RMAppState.ACCEPTED))
     .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
         RMAppEventType.KILL,
         new FinalSavingTransition(
-          new KillAppAndAttemptTransition(), RMAppState.KILLED))
+          new AppKilledTransition(), RMAppState.KILLED))
+    // ACCECPTED state can once again receive APP_ACCEPTED event, because on
+    // recovery the app returns ACCEPTED state and the app once again go
+    // through the scheduler and triggers one more APP_ACCEPTED event at
+    // ACCEPTED state.
+    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
+        RMAppEventType.APP_ACCEPTED)
 
      // Transitions from RUNNING state
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
@@ -197,13 +213,11 @@ public class RMAppImpl implements RMApp,
       // UnManagedAM directly jumps to finished
         RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
     .addTransition(RMAppState.RUNNING,
-        EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
+        EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
         RMAppEventType.ATTEMPT_FAILED,
-        new AttemptFailedTransition(RMAppState.SUBMITTED))
-    .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
-        RMAppEventType.KILL,
-        new FinalSavingTransition(
-          new KillAppAndAttemptTransition(), RMAppState.KILLED))
+        new AttemptFailedTransition(RMAppState.ACCEPTED))
+    .addTransition(RMAppState.RUNNING, RMAppState.KILLING,
+        RMAppEventType.KILL, new KillAttemptTransition())
 
      // Transitions from FINAL_SAVING state
     .addTransition(RMAppState.FINAL_SAVING,
@@ -221,11 +235,27 @@ public class RMAppImpl implements RMApp,
      // Transitions from FINISHING state
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
         RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
-    .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
-        RMAppEventType.KILL, new KillAppAndAttemptTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
-      EnumSet.of(RMAppEventType.NODE_UPDATE))
+      EnumSet.of(RMAppEventType.NODE_UPDATE,
+        // ignore Kill as we have already saved the final Finished state in
+        // state store.
+        RMAppEventType.KILL))
+
+     // Transitions from KILLING state
+    .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
+        RMAppEventType.ATTEMPT_KILLED,
+        new FinalSavingTransition(
+          new AppKilledTransition(), RMAppState.KILLED))
+    .addTransition(RMAppState.KILLING, RMAppState.KILLING,
+        EnumSet.of(
+            RMAppEventType.NODE_UPDATE,
+            RMAppEventType.ATTEMPT_REGISTERED,
+            RMAppEventType.ATTEMPT_UNREGISTERED,
+            RMAppEventType.ATTEMPT_FINISHED,
+            RMAppEventType.ATTEMPT_FAILED,
+            RMAppEventType.APP_UPDATE_SAVED,
+            RMAppEventType.KILL))
 
      // Transitions from FINISHED state
      // ignorable transitions
@@ -249,7 +279,7 @@ public class RMAppImpl implements RMApp,
         EnumSet.of(RMAppEventType.APP_ACCEPTED,
             RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
             RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
-            RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE))
+            RMAppEventType.NODE_UPDATE))
 
      .installTopology();
 
@@ -419,6 +449,7 @@ public class RMAppImpl implements RMApp,
     case ACCEPTED:
     case RUNNING:
     case FINAL_SAVING:
+    case KILLING:
       return FinalApplicationStatus.UNDEFINED;    
     // finished without a proper final state is the same as failed  
     case FINISHING:
@@ -626,7 +657,7 @@ public class RMAppImpl implements RMApp,
         ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
     RMAppAttempt attempt =
         new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
-          submissionContext, conf, user);
+          submissionContext, conf);
     attempts.put(appAttemptId, attempt);
     currentAttempt = attempt;
     if(startAttempt) {
@@ -680,29 +711,46 @@ public class RMAppImpl implements RMApp,
         return app.recoveredFinalState;
       }
 
+      // Notify scheduler about the app on recovery
+      new AddApplicationToSchedulerTransition().transition(app, event);
+
       // No existent attempts means the attempt associated with this app was not
-      // started or started but not yet saved。
+      // started or started but not yet saved.
       if (app.attempts.isEmpty()) {
-        app.createNewAttempt(true);
         return RMAppState.SUBMITTED;
       }
 
-      return RMAppState.RUNNING;
+      // YARN-1507 is saving the application state after the application is
+      // accepted. So after YARN-1507, an app is saved meaning it is accepted.
+      // Thus we return ACCECPTED state on recovery.
+      return RMAppState.ACCEPTED;
     }
   }
 
-  private static final class StartAppAttemptTransition extends RMAppTransition {
+  private static final class AddApplicationToSchedulerTransition extends
+      RMAppTransition {
+    @SuppressWarnings("unchecked")
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event;
-      if (storeEvent.getStoredException() != null) {
+      if (event instanceof RMAppNewSavedEvent) {
+        RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event;
         // For HA this exception needs to be handled by giving up
         // master status if we got fenced
-        LOG.error(
-          "Failed to store application: " + storeEvent.getApplicationId(),
-          storeEvent.getStoredException());
-        ExitUtil.terminate(1, storeEvent.getStoredException());
+        if (((RMAppNewSavedEvent) event).getStoredException() != null) {
+          LOG.error(
+            "Failed to store application: " + storeEvent.getApplicationId(),
+            storeEvent.getStoredException());
+          ExitUtil.terminate(1, storeEvent.getStoredException());
+        }
       }
+      app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
+        app.submissionContext.getQueue(), app.user));
+    }
+  }
+
+  private static final class StartAppAttemptTransition extends RMAppTransition {
+    @Override
+    public void transition(RMAppImpl app, RMAppEvent event) {
       app.createNewAttempt(true);
     };
   }
@@ -811,7 +859,7 @@ public class RMAppImpl implements RMApp,
       RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
       diags = getAppAttemptFailedDiagnostics(failedEvent);
       break;
-    case KILL:
+    case ATTEMPT_KILLED:
       diags = getAppKilledDiagnostics();
       break;
     default:
@@ -901,7 +949,7 @@ public class RMAppImpl implements RMApp,
   private static class AppKilledTransition extends FinalTransition {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      app.diagnostics.append("Application killed by user.");
+      app.diagnostics.append(getAppKilledDiagnostics());
       super.transition(app, event);
     };
   }
@@ -910,15 +958,16 @@ public class RMAppImpl implements RMApp,
     return "Application killed by user.";
   }
 
-  private static class KillAppAndAttemptTransition extends AppKilledTransition {
+  private static class KillAttemptTransition extends RMAppTransition {
     @SuppressWarnings("unchecked")
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      app.handler.handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(),
-          RMAppAttemptEventType.KILL));
-      super.transition(app, event);
+      app.stateBeforeKilling = app.getState();
+      app.handler.handle(new RMAppAttemptEvent(app.currentAttempt
+        .getAppAttemptId(), RMAppAttemptEventType.KILL));
     }
   }
+
   private static final class AppRejectedTransition extends
       FinalTransition{
     public void transition(RMAppImpl app, RMAppEvent event) {
@@ -949,6 +998,8 @@ public class RMAppImpl implements RMApp,
       if (app.finishTime == 0 ) {
         app.finishTime = System.currentTimeMillis();
       }
+      app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, app
+        .getState()));
       app.handler.handle(
           new RMAppManagerEvent(app.applicationId,
           RMAppManagerEventType.APP_COMPLETED));
@@ -977,7 +1028,6 @@ public class RMAppImpl implements RMApp,
         return RMAppState.FINAL_SAVING;
       }
     }
-
   }
 
   @Override
@@ -986,7 +1036,7 @@ public class RMAppImpl implements RMApp,
   }
 
   @Override
-  public boolean isAppSafeToUnregister() {
+  public boolean isAppSafeToTerminate() {
     RMAppState state = getState();
     return state.equals(RMAppState.FINISHING)
         || state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
@@ -1003,6 +1053,9 @@ public class RMAppImpl implements RMApp,
     if (rmAppState.equals(RMAppState.FINAL_SAVING)) {
       rmAppState = stateBeforeFinalSaving;
     }
+    if (rmAppState.equals(RMAppState.KILLING)) {
+      rmAppState = stateBeforeKilling;
+    }
     switch (rmAppState) {
     case NEW:
       return YarnApplicationState.NEW;

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java Fri Jan  3 07:26:52 2014
@@ -28,5 +28,6 @@ public enum RMAppState {
   FINISHING,
   FINISHED,
   FAILED,
+  KILLING,
   KILLED
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java Fri Jan  3 07:26:52 2014
@@ -45,8 +45,7 @@ public enum RMAppAttemptEventType {
   ATTEMPT_UPDATE_SAVED,
 
   // Source: Scheduler
-  APP_REJECTED,
-  APP_ACCEPTED,
+  ATTEMPT_ADDED,
   
   // Source: RMAttemptImpl.recover
   RECOVER

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Fri Jan  3 07:26:52 2014
@@ -75,20 +75,18 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
@@ -139,7 +137,7 @@ public class RMAppAttemptImpl implements
 
   private float progress = 0;
   private String host = "N/A";
-  private int rpcPort;
+  private int rpcPort = -1;
   private String originalTrackingUrl = "N/A";
   private String proxiedTrackingUrl = "N/A";
   private long startTime = 0;
@@ -150,7 +148,6 @@ public class RMAppAttemptImpl implements
   private final StringBuilder diagnostics = new StringBuilder();
 
   private Configuration conf;
-  private String user;
   
   private static final ExpiredTransition EXPIRED_TRANSITION =
       new ExpiredTransition();
@@ -186,14 +183,10 @@ public class RMAppAttemptImpl implements
           RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition())
           
       // Transitions from SUBMITTED state
-      .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
-          RMAppAttemptEventType.APP_REJECTED,
-          new FinalSavingTransition(new AppRejectedTransition(),
-            RMAppAttemptState.FAILED))
       .addTransition(RMAppAttemptState.SUBMITTED, 
           EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
                      RMAppAttemptState.SCHEDULED),
-          RMAppAttemptEventType.APP_ACCEPTED, 
+          RMAppAttemptEventType.ATTEMPT_ADDED,
           new ScheduleTransition())
       .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.KILL,
@@ -361,6 +354,8 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.UNREGISTERED,
               RMAppAttemptEventType.STATUS_UPDATE,
               RMAppAttemptEventType.CONTAINER_ALLOCATED,
+            // ignore Kill as we have already saved the final Finished state in
+            // state store.
               RMAppAttemptEventType.KILL))
 
       // Transitions from FINISHED State
@@ -378,8 +373,7 @@ public class RMAppAttemptImpl implements
       .addTransition(
           RMAppAttemptState.KILLED,
           RMAppAttemptState.KILLED,
-          EnumSet.of(RMAppAttemptEventType.APP_ACCEPTED,
-              RMAppAttemptEventType.APP_REJECTED,
+          EnumSet.of(RMAppAttemptEventType.ATTEMPT_ADDED,
               RMAppAttemptEventType.EXPIRE,
               RMAppAttemptEventType.LAUNCHED,
               RMAppAttemptEventType.LAUNCH_FAILED,
@@ -396,7 +390,7 @@ public class RMAppAttemptImpl implements
       RMContext rmContext, YarnScheduler scheduler,
       ApplicationMasterService masterService,
       ApplicationSubmissionContext submissionContext,
-      Configuration conf, String user) {
+      Configuration conf) {
     this.conf = conf;
     this.applicationAttemptId = appAttemptId;
     this.rmContext = rmContext;
@@ -412,7 +406,6 @@ public class RMAppAttemptImpl implements
     this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
     
     this.stateMachine = stateMachineFactory.make(this);
-    this.user = user;
   }
 
   @Override
@@ -524,6 +517,11 @@ public class RMAppAttemptImpl implements
     proxiedTrackingUrl = originalTrackingUrl;
   }
 
+  private void invalidateAMHostAndPort() {
+    this.host = "N/A";
+    this.rpcPort = -1;
+  }
+
   // This is only used for RMStateStore. Normal operation must invoke the secret
   // manager to get the key and not use the local key directly.
   @Override
@@ -742,36 +740,9 @@ public class RMAppAttemptImpl implements
           new Token<AMRMTokenIdentifier>(id,
             appAttempt.rmContext.getAMRMTokenSecretManager());
 
-      // Add the application to the scheduler
-      appAttempt.eventHandler.handle(
-          new AppAddedSchedulerEvent(appAttempt.applicationAttemptId,
-              appAttempt.submissionContext.getQueue(), appAttempt.user));
-    }
-  }
-
-  private static final class AppRejectedTransition extends BaseTransition {
-    @Override
-    public void transition(RMAppAttemptImpl appAttempt,
-        RMAppAttemptEvent event) {
-
-      RMAppAttemptRejectedEvent rejectedEvent = (RMAppAttemptRejectedEvent) event;
-
-      // Tell the AMS. Unregister from the ApplicationMasterService
-      appAttempt.masterService
-          .unregisterAttempt(appAttempt.applicationAttemptId);
-      
-      // Save the diagnostic message
-      String message = rejectedEvent.getMessage();
-      appAttempt.diagnostics.append(message);
-
-      // Send the rejection event to app
-      appAttempt.eventHandler.handle(
-          new RMAppRejectedEvent(
-              rejectedEvent.getApplicationAttemptId().getApplicationId(),
-              message)
-          );
-
-      appAttempt.removeCredentials(appAttempt);
+      // Add the applicationAttempt to the scheduler
+      appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
+        appAttempt.applicationAttemptId));
     }
   }
 
@@ -787,11 +758,6 @@ public class RMAppAttemptImpl implements
     public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
       if (!appAttempt.submissionContext.getUnmanagedAM()) {
-        // Send the acceptance to the app
-        appAttempt.eventHandler.handle(new RMAppEvent(event
-            .getApplicationAttemptId().getApplicationId(),
-            RMAppEventType.APP_ACCEPTED));
-
         // Request a container for the AM.
         ResourceRequest request =
             BuilderUtils.newResourceRequest(
@@ -911,11 +877,6 @@ public class RMAppAttemptImpl implements
     FinalApplicationStatus finalStatus = null;
 
     switch (event.getType()) {
-    case APP_REJECTED:
-      RMAppAttemptRejectedEvent rejectedEvent =
-          (RMAppAttemptRejectedEvent) event;
-      diags = rejectedEvent.getMessage();
-      break;
     case LAUNCH_FAILED:
       RMAppAttemptLaunchFailedEvent launchFaileEvent =
           (RMAppAttemptLaunchFailedEvent) event;
@@ -1031,6 +992,7 @@ public class RMAppAttemptImpl implements
         {
           // don't leave the tracking URL pointing to a non-existent AM
           appAttempt.setTrackingUrlToRMAppPage();
+          appAttempt.invalidateAMHostAndPort();
           appEvent =
               new RMAppFailedAttemptEvent(applicationId,
                   RMAppEventType.ATTEMPT_KILLED,
@@ -1041,6 +1003,7 @@ public class RMAppAttemptImpl implements
         {
           // don't leave the tracking URL pointing to a non-existent AM
           appAttempt.setTrackingUrlToRMAppPage();
+          appAttempt.invalidateAMHostAndPort();
           appEvent =
               new RMAppFailedAttemptEvent(applicationId,
                   RMAppEventType.ATTEMPT_FAILED,
@@ -1055,9 +1018,8 @@ public class RMAppAttemptImpl implements
       }
 
       appAttempt.eventHandler.handle(appEvent);
-      appAttempt.eventHandler.handle(new AppRemovedSchedulerEvent(appAttemptId,
-        finalAttemptState));
-
+      appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent(
+        appAttemptId, finalAttemptState));
       appAttempt.removeCredentials(appAttempt);
     }
   }
@@ -1083,16 +1045,6 @@ public class RMAppAttemptImpl implements
     public void transition(RMAppAttemptImpl appAttempt,
                             RMAppAttemptEvent event) {
       appAttempt.checkAttemptStoreError(event);
-      // Send the acceptance to the app
-      // Ideally this should have been done when the scheduler accepted the app.
-      // But its here because until the attempt is saved the client should not
-      // launch the unmanaged AM. Client waits for the app status to be accepted
-      // before doing so. So we have to delay the accepted state until we have 
-      // completed storing the attempt
-      appAttempt.eventHandler.handle(new RMAppEvent(event
-          .getApplicationAttemptId().getApplicationId(),
-          RMAppEventType.APP_ACCEPTED));
-      
       super.transition(appAttempt, event);
     }    
   }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java Fri Jan  3 07:26:52 2014
@@ -56,7 +56,7 @@ public class ActiveUsersManager {
    * @param user application user 
    * @param applicationId activated application
    */
-  @Lock({Queue.class, SchedulerApplication.class})
+  @Lock({Queue.class, SchedulerApplicationAttempt.class})
   synchronized public void activateApplication(
       String user, ApplicationId applicationId) {
     Set<ApplicationId> userApps = usersApplications.get(user);
@@ -79,7 +79,7 @@ public class ActiveUsersManager {
    * @param user application user 
    * @param applicationId deactivated application
    */
-  @Lock({Queue.class, SchedulerApplication.class})
+  @Lock({Queue.class, SchedulerApplicationAttempt.class})
   synchronized public void deactivateApplication(
       String user, ApplicationId applicationId) {
     Set<ApplicationId> userApps = usersApplications.get(user);
@@ -102,7 +102,7 @@ public class ActiveUsersManager {
    * resource requests.
    * @return number of active users
    */
-  @Lock({Queue.class, SchedulerApplication.class})
+  @Lock({Queue.class, SchedulerApplicationAttempt.class})
   synchronized public int getNumActiveUsers() {
     return activeUsers;
   }



Mime
View raw message