hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1537330 [6/12] - in /hadoop/common/branches/YARN-321/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/java...
Date Wed, 30 Oct 2013 22:22:36 GMT
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMNMInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMNMInfo.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMNMInfo.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMNMInfo.java Wed Oct 30 22:21:59 2013
@@ -90,6 +90,8 @@ public class RMNMInfo implements RMNMInf
                         ni.getLastHealthReportTime());
         info.put("HealthReport",
                         ni.getHealthReport());
+        info.put("NodeManagerVersion",
+                ni.getNodeManagerVersion());
         if(report != null) {
           info.put("NumContainers", report.getNumContainers());
           info.put("UsedMemoryMB", report.getUsedResource().getMemory());

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java Wed Oct 30 22:21:59 2013
@@ -18,10 +18,15 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeState;
@@ -124,6 +129,46 @@ public class RMServerUtils {
     }
   }
 
+  /**
+   * Utility method to verify if the current user has access based on the
+   * passed {@link AccessControlList}
+   * @param acl the {@link AccessControlList} to check against
+   * @param method the method name to be logged
+   * @param LOG the logger to use
+   * @return {@link UserGroupInformation} of the current user
+   * @throws IOException
+   */
+  public static UserGroupInformation verifyAccess(
+      AccessControlList acl, String method, final Log LOG)
+      throws IOException {
+    UserGroupInformation user;
+    try {
+      user = UserGroupInformation.getCurrentUser();
+    } catch (IOException ioe) {
+      LOG.warn("Couldn't get current user", ioe);
+      RMAuditLogger.logFailure("UNKNOWN", method, acl.toString(),
+          "AdminService", "Couldn't get current user");
+      throw ioe;
+    }
+
+    if (!acl.isUserAllowed(user)) {
+      LOG.warn("User " + user.getShortUserName() + " doesn't have permission" +
+          " to call '" + method + "'");
+
+      RMAuditLogger.logFailure(user.getShortUserName(), method,
+          acl.toString(), "AdminService",
+          RMAuditLogger.AuditConstants.UNAUTHORIZED_USER);
+
+      throw new AccessControlException("User " + user.getShortUserName() +
+              " doesn't have permission" +
+              " to call '" + method + "'");
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(method + " invoked by user " + user.getShortUserName());
+    }
+    return user;
+  }
+
   public static YarnApplicationState createApplicationState(
       RMAppState rmAppState) {
     switch (rmAppState) {

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Wed Oct 30 22:21:59 2013
@@ -27,6 +27,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.http.HttpConfig.Policy;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.security.SecurityUtil;
@@ -77,6 +79,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
@@ -88,6 +91,7 @@ import org.apache.hadoop.yarn.server.web
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
 import org.apache.hadoop.yarn.webapp.WebApps.Builder;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -105,11 +109,27 @@ public class ResourceManager extends Com
   public static final int SHUTDOWN_HOOK_PRIORITY = 30;
 
   private static final Log LOG = LogFactory.getLog(ResourceManager.class);
-  public static final long clusterTimeStamp = System.currentTimeMillis();
+  private static long clusterTimeStamp = System.currentTimeMillis();
 
+  /**
+   * "Always On" services. Services that need to run always irrespective of
+   * the HA state of the RM.
+   */
+  @VisibleForTesting
+  protected RMHAProtocolService haService;
+
+  /**
+   * "Active" services. Services that need to run only on the Active RM.
+   * These services are managed (initialized, started, stopped) by the
+   * {@link CompositeService} RMActiveServices.
+   *
+   * RM is active when (1) HA is disabled, or (2) HA is enabled and the RM is
+   * in Active state.
+   */
+  protected RMActiveServices activeServices;
   protected ClientToAMTokenSecretManagerInRM clientToAMSecretManager =
       new ClientToAMTokenSecretManagerInRM();
-  
+
   protected RMContainerTokenSecretManager containerTokenSecretManager;
   protected NMTokenSecretManagerInRM nmTokenSecretManager;
 
@@ -128,6 +148,7 @@ public class ResourceManager extends Com
   private EventHandler<SchedulerEvent> schedulerDispatcher;
   protected RMAppManager rmAppManager;
   protected ApplicationACLsManager applicationACLsManager;
+  protected QueueACLsManager queueACLsManager;
   protected RMDelegationTokenSecretManager rmDTSecretManager;
   private DelegationTokenRenewer delegationTokenRenewer;
   private WebApp webApp;
@@ -135,6 +156,8 @@ public class ResourceManager extends Com
   protected ResourceTrackerService resourceTracker;
   private boolean recoveryEnabled;
 
+  /** End of Active services */
+
   private Configuration conf;
   
   public ResourceManager() {
@@ -144,145 +167,41 @@ public class ResourceManager extends Com
   public RMContext getRMContext() {
     return this.rmContext;
   }
-  
+
+  public static long getClusterTimeStamp() {
+    return clusterTimeStamp;
+  }
+
+  @VisibleForTesting
+  protected static void setClusterTimeStamp(long timestamp) {
+    clusterTimeStamp = timestamp;
+  }
+
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-
     validateConfigs(conf);
-
     this.conf = conf;
 
-    this.conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
-
-    this.rmDispatcher = createDispatcher();
-    addIfService(this.rmDispatcher);
-
-    this.amRmTokenSecretManager = createAMRMTokenSecretManager(conf);
-
-    this.containerAllocationExpirer = new ContainerAllocationExpirer(
-        this.rmDispatcher);
-    addService(this.containerAllocationExpirer);
-
-    AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
-    addService(amLivelinessMonitor);
-
-    AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
-    addService(amFinishingMonitor);
-
-    if (UserGroupInformation.isSecurityEnabled()) {
-      this.delegationTokenRenewer = createDelegationTokenRenewer();
-      addService(delegationTokenRenewer);
-    }
-
-    this.containerTokenSecretManager = createContainerTokenSecretManager(conf);
-    this.nmTokenSecretManager = createNMTokenSecretManager(conf);
-    
-    boolean isRecoveryEnabled = conf.getBoolean(
-        YarnConfiguration.RECOVERY_ENABLED,
-        YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
-    
-    RMStateStore rmStore = null;
-    if(isRecoveryEnabled) {
-      recoveryEnabled = true;
-      rmStore =  RMStateStoreFactory.getStore(conf);
-    } else {
-      recoveryEnabled = false;
-      rmStore = new NullRMStateStore();
-    }
-
-    try {
-      rmStore.init(conf);
-      rmStore.setRMDispatcher(rmDispatcher);
-    } catch (Exception e) {
-      // the Exception from stateStore.init() needs to be handled for 
-      // HA and we need to give up master status if we got fenced
-      LOG.error("Failed to init state store", e);
-      ExitUtil.terminate(1, e);
-    }
-
-    this.rmContext =
-        new RMContextImpl(this.rmDispatcher, rmStore,
-          this.containerAllocationExpirer, amLivelinessMonitor,
-          amFinishingMonitor, delegationTokenRenewer, this.amRmTokenSecretManager,
-          this.containerTokenSecretManager, this.nmTokenSecretManager,
-          this.clientToAMSecretManager);
-    
-    // Register event handler for NodesListManager
-    this.nodesListManager = new NodesListManager(this.rmContext);
-    this.rmDispatcher.register(NodesListManagerEventType.class, 
-        this.nodesListManager);
-    addService(nodesListManager);
-
-    // Initialize the scheduler
-    this.scheduler = createScheduler();
-    this.schedulerDispatcher = createSchedulerEventDispatcher();
-    addIfService(this.schedulerDispatcher);
-    this.rmDispatcher.register(SchedulerEventType.class,
-        this.schedulerDispatcher);
-
-    // Register event handler for RmAppEvents
-    this.rmDispatcher.register(RMAppEventType.class,
-        new ApplicationEventDispatcher(this.rmContext));
-
-    // Register event handler for RmAppAttemptEvents
-    this.rmDispatcher.register(RMAppAttemptEventType.class,
-        new ApplicationAttemptEventDispatcher(this.rmContext));
-
-    // Register event handler for RmNodes
-    this.rmDispatcher.register(RMNodeEventType.class,
-        new NodeEventDispatcher(this.rmContext));    
-
-    this.nmLivelinessMonitor = createNMLivelinessMonitor();
-    addService(this.nmLivelinessMonitor);
-
-    this.resourceTracker = createResourceTrackerService();
-    addService(resourceTracker);
-
-    DefaultMetricsSystem.initialize("ResourceManager");
-    JvmMetrics.initSingleton("ResourceManager", null);
-
-    try {
-      this.scheduler.reinitialize(conf, this.rmContext);
-    } catch (IOException ioe) {
-      throw new RuntimeException("Failed to initialize scheduler", ioe);
-    }
-
-    // creating monitors that handle preemption
-    createPolicyMonitors();
-
-    masterService = createApplicationMasterService();
-    addService(masterService) ;
-
-    this.applicationACLsManager = new ApplicationACLsManager(conf);
-
-    this.rmAppManager = createRMAppManager();
-    // Register event handler for RMAppManagerEvents
-    this.rmDispatcher.register(RMAppManagerEventType.class,
-        this.rmAppManager);
-    this.rmDTSecretManager = createRMDelegationTokenSecretManager(this.rmContext);
-    clientRM = createClientRMService();
-    addService(clientRM);
-    
-    adminService = createAdminService(clientRM, masterService, resourceTracker);
-    addService(adminService);
-
-    this.applicationMasterLauncher = createAMLauncher();
-    this.rmDispatcher.register(AMLauncherEventType.class, 
-        this.applicationMasterLauncher);
-
-    addService(applicationMasterLauncher);
-
-    new RMNMInfo(this.rmContext, this.scheduler);
-    
+    haService = createRMHAProtocolService();
+    addService(haService);
     super.serviceInit(conf);
   }
   
+  protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler,
+      Configuration conf) {
+    return new QueueACLsManager(scheduler, conf);
+  }
+
   @VisibleForTesting
   protected void setRMStateStore(RMStateStore rmStore) {
     rmStore.setRMDispatcher(rmDispatcher);
     ((RMContextImpl) rmContext).setStateStore(rmStore);
   }
 
+  protected RMHAProtocolService createRMHAProtocolService() {
+    return new RMHAProtocolService(this);
+  }
+
   protected RMContainerTokenSecretManager createContainerTokenSecretManager(
       Configuration conf) {
     return new RMContainerTokenSecretManager(conf);
@@ -374,6 +293,216 @@ public class ResourceManager extends Com
     }
   }
 
+  /**
+   * RMActiveServices handles all the Active services in the RM.
+   */
+  @Private
+  class RMActiveServices extends CompositeService {
+    RMActiveServices() {
+      super("RMActiveServices");
+    }
+
+    @Override
+    protected void serviceInit(Configuration configuration) throws Exception {
+      conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
+      rmDispatcher = createDispatcher();
+      addIfService(rmDispatcher);
+
+      amRmTokenSecretManager = createAMRMTokenSecretManager(conf);
+
+      containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
+      addService(containerAllocationExpirer);
+
+      AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
+      addService(amLivelinessMonitor);
+
+      AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
+      addService(amFinishingMonitor);
+
+      containerTokenSecretManager = createContainerTokenSecretManager(conf);
+      nmTokenSecretManager = createNMTokenSecretManager(conf);
+
+      boolean isRecoveryEnabled = conf.getBoolean(
+          YarnConfiguration.RECOVERY_ENABLED,
+          YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
+
+      RMStateStore rmStore = null;
+      if(isRecoveryEnabled) {
+        recoveryEnabled = true;
+        rmStore =  RMStateStoreFactory.getStore(conf);
+      } else {
+        recoveryEnabled = false;
+        rmStore = new NullRMStateStore();
+      }
+
+      try {
+        rmStore.init(conf);
+        rmStore.setRMDispatcher(rmDispatcher);
+      } catch (Exception e) {
+        // the Exception from stateStore.init() needs to be handled for
+        // HA and we need to give up master status if we got fenced
+        LOG.error("Failed to init state store", e);
+        ExitUtil.terminate(1, e);
+      }
+
+      if (UserGroupInformation.isSecurityEnabled()) {
+        delegationTokenRenewer = createDelegationTokenRenewer();
+      }
+
+      rmContext = new RMContextImpl(
+          rmDispatcher, rmStore, containerAllocationExpirer, amLivelinessMonitor,
+          amFinishingMonitor, delegationTokenRenewer, amRmTokenSecretManager,
+          containerTokenSecretManager, nmTokenSecretManager,
+          clientToAMSecretManager);
+
+      // Register event handler for NodesListManager
+      nodesListManager = new NodesListManager(rmContext);
+      rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
+      addService(nodesListManager);
+
+      // Initialize the scheduler
+      scheduler = createScheduler();
+      schedulerDispatcher = createSchedulerEventDispatcher();
+      addIfService(schedulerDispatcher);
+      rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
+
+      // Register event handler for RmAppEvents
+      rmDispatcher.register(RMAppEventType.class,
+          new ApplicationEventDispatcher(rmContext));
+
+      // Register event handler for RmAppAttemptEvents
+      rmDispatcher.register(RMAppAttemptEventType.class,
+          new ApplicationAttemptEventDispatcher(rmContext));
+
+      // Register event handler for RmNodes
+      rmDispatcher.register(
+          RMNodeEventType.class, new NodeEventDispatcher(rmContext));
+
+      nmLivelinessMonitor = createNMLivelinessMonitor();
+      addService(nmLivelinessMonitor);
+
+      resourceTracker = createResourceTrackerService();
+      addService(resourceTracker);
+
+      DefaultMetricsSystem.initialize("ResourceManager");
+      JvmMetrics.initSingleton("ResourceManager", null);
+
+      try {
+        scheduler.reinitialize(conf, rmContext);
+      } catch (IOException ioe) {
+        throw new RuntimeException("Failed to initialize scheduler", ioe);
+      }
+
+      // creating monitors that handle preemption
+      createPolicyMonitors();
+
+      masterService = createApplicationMasterService();
+      addService(masterService) ;
+
+      applicationACLsManager = new ApplicationACLsManager(conf);
+
+      queueACLsManager = createQueueACLsManager(scheduler, conf);
+
+      rmAppManager = createRMAppManager();
+      // Register event handler for RMAppManagerEvents
+      rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
+      rmDTSecretManager = createRMDelegationTokenSecretManager(rmContext);
+      rmContext.setRMDelegationTokenSecretManager(rmDTSecretManager);
+      clientRM = createClientRMService();
+      rmContext.setClientRMService(clientRM);
+      addService(clientRM);
+
+      adminService = createAdminService(clientRM, masterService, resourceTracker);
+      addService(adminService);
+
+      applicationMasterLauncher = createAMLauncher();
+      rmDispatcher.register(AMLauncherEventType.class,
+          applicationMasterLauncher);
+
+      addService(applicationMasterLauncher);
+      if (UserGroupInformation.isSecurityEnabled()) {
+        addService(delegationTokenRenewer);
+        delegationTokenRenewer.setRMContext(rmContext);
+      }
+
+      new RMNMInfo(rmContext, scheduler);
+
+      super.serviceInit(conf);
+    }
+
+    @Override
+    protected void serviceStart() throws Exception {
+      amRmTokenSecretManager.start();
+      containerTokenSecretManager.start();
+      nmTokenSecretManager.start();
+
+      RMStateStore rmStore = rmContext.getStateStore();
+      // The state store needs to start irrespective of recoveryEnabled as apps
+      // need events to move to further states.
+      rmStore.start();
+
+      if(recoveryEnabled) {
+        try {
+          RMState state = rmStore.loadState();
+          recover(state);
+        } catch (Exception e) {
+          // the Exception from loadState() needs to be handled for
+          // HA and we need to give up master status if we got fenced
+          LOG.error("Failed to load/recover state", e);
+          ExitUtil.terminate(1, e);
+        }
+      }
+
+      startWepApp();
+      try {
+        rmDTSecretManager.startThreads();
+      } catch(IOException ie) {
+        throw new YarnRuntimeException("Failed to start secret manager threads", ie);
+      }
+
+      if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+        int port = webApp.port();
+        WebAppUtils.setRMWebAppPort(conf, port);
+      }
+
+      super.serviceStart();
+    }
+
+    @Override
+    protected void serviceStop() throws Exception {
+      if (webApp != null) {
+        webApp.stop();
+      }
+      if (rmDTSecretManager != null) {
+        rmDTSecretManager.stopThreads();
+      }
+
+      if (amRmTokenSecretManager != null) {
+        amRmTokenSecretManager.stop();
+      }
+      if (containerTokenSecretManager != null) {
+        containerTokenSecretManager.stop();
+      }
+      if(nmTokenSecretManager != null) {
+        nmTokenSecretManager.stop();
+      }
+
+      DefaultMetricsSystem.shutdown();
+
+      if (rmContext != null) {
+        RMStateStore store = rmContext.getStateStore();
+        try {
+          store.close();
+        } catch (Exception e) {
+          LOG.error("Error closing store.", e);
+        }
+      }
+
+      super.serviceStop();
+    }
+  }
+
   @Private
   public static class SchedulerEventDispatcher extends AbstractService
       implements EventHandler<SchedulerEvent> {
@@ -592,10 +721,9 @@ public class ResourceManager extends Com
                 YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY)
             .withHttpSpnegoKeytabKey(
                 YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
-            .at(this.conf.get(YarnConfiguration.RM_WEBAPP_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS)); 
-    String proxyHostAndPort = YarnConfiguration.getProxyHostAndPort(conf);
-    if(YarnConfiguration.getRMWebAppHostAndPort(conf).
+            .at(WebAppUtils.getRMWebAppURLWithoutScheme(conf)); 
+    String proxyHostAndPort = WebAppUtils.getProxyHostAndPort(conf);
+    if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf).
         equals(proxyHostAndPort)) {
       AppReportFetcher fetcher = new AppReportFetcher(conf, getClientRMService());
       builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME, 
@@ -608,69 +736,55 @@ public class ResourceManager extends Com
     webApp = builder.start(new RMWebApp(this));
   }
 
-  @Override
-  protected void serviceStart() throws Exception {
-    try {
-      doSecureLogin();
-    } catch(IOException ie) {
-      throw new YarnRuntimeException("Failed to login", ie);
-    }
+  void setConf(Configuration configuration) {
+    conf = configuration;
+  }
 
-    this.amRmTokenSecretManager.start();
-    this.containerTokenSecretManager.start();
-    this.nmTokenSecretManager.start();
-
-    // Explicitly start DTRenewer too in secure mode before kicking recovery as
-    // tokens will start getting added for renewal as part of the recovery
-    // process itself.
-    if (UserGroupInformation.isSecurityEnabled()) {
-      this.delegationTokenRenewer.start();
-    }
-
-    RMStateStore rmStore = rmContext.getStateStore();
-    // The state store needs to start irrespective of recoveryEnabled as apps
-    // need events to move to further states.
-    rmStore.start();
+  /**
+   * Helper method to create and init {@link #activeServices}. This creates an
+   * instance of {@link RMActiveServices} and initializes it.
+   * @throws Exception
+   */
+  void createAndInitActiveServices() throws Exception {
+    activeServices = new RMActiveServices();
+    activeServices.init(conf);
+  }
 
-    if(recoveryEnabled) {
-      try {
-        RMState state = rmStore.loadState();
-        recover(state);
-      } catch (Exception e) {
-        // the Exception from loadState() needs to be handled for 
-        // HA and we need to give up master status if we got fenced
-        LOG.error("Failed to load/recover state", e);
-        ExitUtil.terminate(1, e);
-      }
+  /**
+   * Helper method to start {@link #activeServices}.
+   * @throws Exception
+   */
+  void startActiveServices() throws Exception {
+    if (activeServices != null) {
+      clusterTimeStamp = System.currentTimeMillis();
+      activeServices.start();
     }
+  }
 
-    startWepApp();
-    try {
-      rmDTSecretManager.startThreads();
-    } catch(IOException ie) {
-      throw new YarnRuntimeException("Failed to start secret manager threads", ie);
+  /**
+   * Helper method to stop {@link #activeServices}.
+   * @throws Exception
+   */
+  void stopActiveServices() throws Exception {
+    if (activeServices != null) {
+      activeServices.stop();
+      activeServices = null;
     }
+  }
+
+  @VisibleForTesting
+  protected boolean areActiveServicesRunning() {
+    return activeServices != null && activeServices.isInState(STATE.STARTED);
+  }
 
-    if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
-      String hostname = getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS,
-                                        YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS);
-      hostname = (hostname.contains(":")) ? hostname.substring(0, hostname.indexOf(":")) : hostname;
-      int port = webApp.port();
-      String resolvedAddress = hostname + ":" + port;
-      conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, resolvedAddress);
+  @Override
+  protected void serviceStart() throws Exception {
+    try {
+      doSecureLogin();
+    } catch(IOException ie) {
+      throw new YarnRuntimeException("Failed to login", ie);
     }
-    
     super.serviceStart();
-
-    /*synchronized(shutdown) {
-      try {
-        while(!shutdown.get()) {
-          shutdown.wait();
-        }
-      } catch(InterruptedException ie) {
-        LOG.info("Interrupted while waiting", ie);
-      }
-    }*/
   }
   
   protected void doSecureLogin() throws IOException {
@@ -680,39 +794,6 @@ public class ResourceManager extends Com
 
   @Override
   protected void serviceStop() throws Exception {
-    if (webApp != null) {
-      webApp.stop();
-    }
-    if (rmDTSecretManager != null) {
-      rmDTSecretManager.stopThreads();
-    }
-
-    if (amRmTokenSecretManager != null) {
-      this.amRmTokenSecretManager.stop();
-    }
-    if (containerTokenSecretManager != null) {
-      this.containerTokenSecretManager.stop();
-    }
-    if(nmTokenSecretManager != null) {
-      nmTokenSecretManager.stop();
-    }
-
-    /*synchronized(shutdown) {
-      shutdown.set(true);
-      shutdown.notifyAll();
-    }*/
-
-    DefaultMetricsSystem.shutdown();
-
-    if (rmContext != null) {
-      RMStateStore store = rmContext.getStateStore();
-      try {
-        store.close();
-      } catch (Exception e) {
-        LOG.error("Error closing store.", e);
-      }
-    }
-
     super.serviceStop();
   }
   
@@ -740,7 +821,8 @@ public class ResourceManager extends Com
 
   protected ClientRMService createClientRMService() {
     return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
-        this.applicationACLsManager, this.rmDTSecretManager);
+        this.applicationACLsManager, this.queueACLsManager,
+        this.rmDTSecretManager);
   }
 
   protected ApplicationMasterService createApplicationMasterService() {
@@ -821,6 +903,11 @@ public class ResourceManager extends Com
   }
 
   @Private
+  public QueueACLsManager getQueueACLsManager() {
+    return this.queueACLsManager;
+  }
+
+  @Private
   public RMContainerTokenSecretManager getRMContainerTokenSecretManager() {
     return this.containerTokenSecretManager;
   }
@@ -843,7 +930,7 @@ public class ResourceManager extends Com
     // recover applications
     rmAppManager.recover(state);
   }
-  
+
   public static void main(String argv[]) {
     Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
@@ -853,6 +940,7 @@ public class ResourceManager extends Com
       ShutdownHookManager.get().addShutdownHook(
         new CompositeServiceShutdownHook(resourceManager),
         SHUTDOWN_HOOK_PRIORITY);
+      setHttpPolicy(conf);
       resourceManager.init(conf);
       resourceManager.start();
     } catch (Throwable t) {
@@ -860,4 +948,10 @@ public class ResourceManager extends Com
       System.exit(-1);
     }
   }
+  
+  private static void setHttpPolicy(Configuration conf) {
+    HttpConfig.setPolicy(Policy.fromString(conf.get(
+      YarnConfiguration.YARN_HTTP_POLICY_KEY,
+      YarnConfiguration.YARN_HTTP_POLICY_DEFAULT)));
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Wed Oct 30 22:21:59 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.VersionUtil;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -55,6 +56,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.hadoop.yarn.util.YarnVersionInfo;
 
 public class ResourceTrackerService extends AbstractService implements
     ResourceTracker {
@@ -73,6 +75,7 @@ public class ResourceTrackerService exte
   private long nextHeartBeatInterval;
   private Server server;
   private InetSocketAddress resourceTrackerAddress;
+  private String minimumNodeManagerVersion;
 
   private static final NodeHeartbeatResponse resync = recordFactory
       .newRecordInstance(NodeHeartbeatResponse.class);
@@ -99,6 +102,7 @@ public class ResourceTrackerService exte
     this.nmLivelinessMonitor = nmLivelinessMonitor;
     this.containerTokenSecretManager = containerTokenSecretManager;
     this.nmTokenSecretManager = nmTokenSecretManager;
+
   }
 
   @Override
@@ -124,7 +128,11 @@ public class ResourceTrackerService exte
     minAllocVcores = conf.getInt(
     	YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
     	YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
-    
+
+    minimumNodeManagerVersion = conf.get(
+        YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
+        YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
+
     super.serviceInit(conf);
   }
 
@@ -172,10 +180,30 @@ public class ResourceTrackerService exte
     int cmPort = nodeId.getPort();
     int httpPort = request.getHttpPort();
     Resource capability = request.getResource();
+    String nodeManagerVersion = request.getNMVersion();
 
     RegisterNodeManagerResponse response = recordFactory
         .newRecordInstance(RegisterNodeManagerResponse.class);
 
+    if (!minimumNodeManagerVersion.equals("NONE")) {
+      if (minimumNodeManagerVersion.equals("EqualToRM")) {
+        minimumNodeManagerVersion = YarnVersionInfo.getVersion();
+      }
+
+      if ((nodeManagerVersion == null) ||
+          (VersionUtil.compareVersions(nodeManagerVersion,minimumNodeManagerVersion)) < 0) {
+        String message =
+            "Disallowed NodeManager Version " + nodeManagerVersion
+                + ", is less than the minimum version "
+                + minimumNodeManagerVersion + " sending SHUTDOWN signal to "
+                + "NodeManager.";
+        LOG.info(message);
+        response.setDiagnosticsMessage(message);
+        response.setNodeAction(NodeAction.SHUTDOWN);
+        return response;
+      }
+    }
+
     // Check if this node is a 'valid' node
     if (!this.nodesListManager.isValidNode(host)) {
       String message =
@@ -206,7 +234,7 @@ public class ResourceTrackerService exte
         .getCurrentKey());    
 
     RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
-        resolve(host), capability);
+        resolve(host), capability, nodeManagerVersion);
 
     RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
     if (oldNode == null) {
@@ -229,7 +257,8 @@ public class ResourceTrackerService exte
             + ", assigned nodeId " + nodeId;
     LOG.info(message);
     response.setNodeAction(NodeAction.NORMAL);
-    response.setRMIdentifier(ResourceManager.clusterTimeStamp);
+    response.setRMIdentifier(ResourceManager.getClusterTimeStamp());
+    response.setRMVersion(YarnVersionInfo.getVersion());
     return response;
   }
 

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Wed Oct 30 22:21:59 2013
@@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -63,12 +64,6 @@ public class FileSystemRMStateStore exte
   public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
 
   private static final String ROOT_DIR_NAME = "FSRMStateRoot";
-  private static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
-  private static final String RM_APP_ROOT = "RMAppRoot";
-  private static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
-  private static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
-  private static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
-      "RMDTSequenceNumber_";
 
   protected FileSystem fs;
 
@@ -124,6 +119,9 @@ public class FileSystemRMStateStore exte
         for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {
           assert childNodeStatus.isFile();
           String childNodeName = childNodeStatus.getPath().getName();
+          if (checkAndRemovePartialRecord(childNodeStatus.getPath())) {
+            continue;
+          }
           byte[] childData =
               readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
           if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
@@ -184,12 +182,28 @@ public class FileSystemRMStateStore exte
     }
   }
 
+  private boolean checkAndRemovePartialRecord(Path record) throws IOException {
+    // If the file ends with .tmp then it shows that it failed
+    // during saving state into state store. The file will be deleted as a
+    // part of this call
+    if (record.getName().endsWith(".tmp")) {
+      LOG.error("incomplete rm state store entry found :"
+          + record);
+      fs.delete(record, false);
+      return true;
+    }
+    return false;
+  }
+
   private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
     FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot);
 
     for(FileStatus childNodeStatus : childNodes) {
       assert childNodeStatus.isFile();
       String childNodeName = childNodeStatus.getPath().getName();
+      if (checkAndRemovePartialRecord(childNodeStatus.getPath())) {
+        continue;
+      }
       if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
         rmState.rmSecretManagerState.dtSequenceNumber =
             Integer.parseInt(childNodeName.split("_")[1]);
@@ -350,10 +364,19 @@ public class FileSystemRMStateStore exte
     return data;
   }
 
+  /*
+   * In order to make this write atomic as a part of write we will first write
+   * data to .tmp file and then rename it. Here we are assuming that rename is
+   * atomic for underlying file system.
+   */
   private void writeFile(Path outputPath, byte[] data) throws Exception {
-    FSDataOutputStream fsOut = fs.create(outputPath, false);
+    Path tempPath =
+        new Path(outputPath.getParent(), outputPath.getName() + ".tmp");
+    FSDataOutputStream fsOut = null;
+    fsOut = fs.create(tempPath, false);
     fsOut.write(data);
     fsOut.close();
+    fs.rename(tempPath, outputPath);
   }
 
   private boolean renameFile(Path src, Path dst) throws Exception {

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Wed Oct 30 22:21:59 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@@ -108,7 +109,9 @@ public class MemoryRMStateStore extends 
 
     ApplicationState appState = state.getApplicationState().get(
         attemptState.getAttemptId().getApplicationId());
-    assert appState != null;
+    if (appState == null) {
+      throw new YarnRuntimeException("Application doesn't exist");
+    }
 
     if (appState.attempts.containsKey(attemptState.getAttemptId())) {
       Exception e = new IOException("Attempt: " +
@@ -125,7 +128,9 @@ public class MemoryRMStateStore extends 
                                                             throws Exception {
     ApplicationId appId = appState.getAppId();
     ApplicationState removed = state.appState.remove(appId);
-    assert removed != null;
+    if (removed == null) {
+      throw new YarnRuntimeException("Removing non-exsisting application state");
+    }
   }
 
   @Override

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Wed Oct 30 22:21:59 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
 
@@ -64,6 +65,14 @@ import org.apache.hadoop.yarn.server.res
  */
 public abstract class RMStateStore extends AbstractService {
 
+  // constants for RM App state and RMDTSecretManagerState.
+  protected static final String RM_APP_ROOT = "RMAppRoot";
+  protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
+  protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
+  protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
+  protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
+      "RMDTSequenceNumber_";
+
   public static final Log LOG = LogFactory.getLog(RMStateStore.class);
 
   public RMStateStore() {
@@ -463,8 +472,9 @@ public abstract class RMStateStore exten
               (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
                   .newApplicationAttemptStateData(attemptState.getAttemptId(),
                     attemptState.getMasterContainer(), appAttemptTokens);
-
-            LOG.info("Storing info for attempt: " + attemptState.getAttemptId());
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
+            }
             storeApplicationAttemptState(attemptState.getAttemptId().toString(), 
                                          attemptStateData);
           } catch (Exception e) {
@@ -482,12 +492,15 @@ public abstract class RMStateStore exten
           ApplicationState appState = 
                           ((RMStateStoreRemoveAppEvent) event).getAppState();
           ApplicationId appId = appState.getAppId();
-          
+          Exception removedException = null;
           LOG.info("Removing info for app: " + appId);
           try {
             removeApplicationState(appState);
           } catch (Exception e) {
             LOG.error("Error removing app: " + appId, e);
+            removedException = e;
+          } finally {
+            notifyDoneRemovingApplcation(appId, removedException);
           }
         }
         break;
@@ -521,7 +534,18 @@ public abstract class RMStateStore exten
     rmDispatcher.getEventHandler().handle(
         new RMAppAttemptStoredEvent(attemptId, storedException));
   }
-  
+
+  @SuppressWarnings("unchecked")
+  /**
+   * This is to notify RMApp that this application has been removed from
+   * RMStateStore
+   */
+  private void notifyDoneRemovingApplcation(ApplicationId appId,
+      Exception removedException) {
+    rmDispatcher.getEventHandler().handle(
+      new RMAppRemovedEvent(appId, removedException));
+  }
+
   /**
    * EventHandler implementation which forward events to the FSRMStateStore
    * This hides the EventHandle methods of the store from its public interface 

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Wed Oct 30 22:21:59 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -194,4 +195,20 @@ public interface RMApp extends EventHand
    * @return the application type.
    */
   String getApplicationType(); 
+
+  /**
+   * Check whether this application is safe to unregister.
+   * An application is deemed to be safe to unregister if it is an unmanaged
+   * AM or its state has been removed from state store.
+   * @return the flag which indicates whether this application is safe to
+   *         unregister.
+   */
+  boolean isAppSafeToUnregister();
+
+  /**
+   * Create the external user-facing state of ApplicationMaster from the
+   * current state of the {@link RMApp}.
+   * @return the external user-facing state of ApplicationMaster.
+   */
+  YarnApplicationState createApplicationState();
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Wed Oct 30 22:21:59 2013
@@ -27,11 +27,14 @@ public enum RMAppEventType {
   // Source: RMAppAttempt
   APP_REJECTED,
   APP_ACCEPTED,
-  APP_SAVED,
   ATTEMPT_REGISTERED,
-  ATTEMPT_FINISHING,
+  ATTEMPT_UNREGISTERED,
   ATTEMPT_FINISHED, // Will send the final state
   ATTEMPT_FAILED,
   ATTEMPT_KILLED,
-  NODE_UPDATE
+  NODE_UPDATE,
+
+  // Source: RMStateStore
+  APP_SAVED,
+  APP_REMOVED
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Wed Oct 30 22:21:59 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -109,6 +111,8 @@ public class RMAppImpl implements RMApp,
   private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
   private static final AppFinishedTransition FINISHED_TRANSITION =
       new AppFinishedTransition();
+  private boolean isAppRemovalRequestSent = false;
+  private RMAppState previousStateAtRemoving;
 
   private static final StateMachineFactory<RMAppImpl,
                                            RMAppState,
@@ -167,8 +171,9 @@ public class RMAppImpl implements RMApp,
      // Transitions from RUNNING state
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
-    .addTransition(RMAppState.RUNNING, RMAppState.FINISHING,
-        RMAppEventType.ATTEMPT_FINISHING, new RMAppFinishingTransition())
+    .addTransition(RMAppState.RUNNING, RMAppState.REMOVING,
+          RMAppEventType.ATTEMPT_UNREGISTERED,
+        new RMAppRemovingTransition())
     .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
         RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
     .addTransition(RMAppState.RUNNING,
@@ -178,6 +183,17 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.RUNNING, RMAppState.KILLED,
         RMAppEventType.KILL, new KillAppAndAttemptTransition())
 
+     // Transitions from REMOVING state
+    .addTransition(RMAppState.REMOVING, RMAppState.FINISHING,
+        RMAppEventType.APP_REMOVED,  new RMAppFinishingTransition())
+    .addTransition(RMAppState.REMOVING, RMAppState.FINISHED,
+        RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+    .addTransition(RMAppState.REMOVING, RMAppState.KILLED,
+        RMAppEventType.KILL, new KillAppAndAttemptTransition())
+    // ignorable transitions
+    .addTransition(RMAppState.REMOVING, RMAppState.REMOVING,
+        RMAppEventType.NODE_UPDATE)
+
      // Transitions from FINISHING state
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
         RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
@@ -185,36 +201,34 @@ public class RMAppImpl implements RMApp,
         RMAppEventType.KILL, new KillAppAndAttemptTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
-        RMAppEventType.NODE_UPDATE)
+      EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.APP_REMOVED))
 
      // Transitions from FINISHED state
-    .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
-        RMAppEventType.KILL)
      // ignorable transitions
     .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
         EnumSet.of(
             RMAppEventType.NODE_UPDATE,
-            RMAppEventType.ATTEMPT_FINISHING,
-            RMAppEventType.ATTEMPT_FINISHED))
+            RMAppEventType.ATTEMPT_UNREGISTERED,
+            RMAppEventType.ATTEMPT_FINISHED,
+            RMAppEventType.KILL,
+            RMAppEventType.APP_REMOVED))
 
      // Transitions from FAILED state
-    .addTransition(RMAppState.FAILED, RMAppState.FAILED,
-        EnumSet.of(RMAppEventType.KILL, RMAppEventType.APP_SAVED))
      // ignorable transitions
-    .addTransition(RMAppState.FAILED, RMAppState.FAILED, 
-        RMAppEventType.NODE_UPDATE)
+    .addTransition(RMAppState.FAILED, RMAppState.FAILED,
+        EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE,
+          RMAppEventType.APP_SAVED, RMAppEventType.APP_REMOVED))
 
      // Transitions from KILLED state
+     // ignorable transitions
     .addTransition(
         RMAppState.KILLED,
         RMAppState.KILLED,
         EnumSet.of(RMAppEventType.APP_ACCEPTED,
             RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
             RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
-            RMAppEventType.ATTEMPT_KILLED, RMAppEventType.APP_SAVED))
-     // ignorable transitions
-    .addTransition(RMAppState.KILLED, RMAppState.KILLED,
-        RMAppEventType.NODE_UPDATE)
+            RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE,
+            RMAppEventType.APP_SAVED, RMAppEventType.APP_REMOVED))
 
      .installTopology();
 
@@ -384,6 +398,7 @@ public class RMAppImpl implements RMApp,
     case SUBMITTED:
     case ACCEPTED:
     case RUNNING:
+    case REMOVING:
       return FinalApplicationStatus.UNDEFINED;    
     // finished without a proper final state is the same as failed  
     case FINISHING:
@@ -432,18 +447,18 @@ public class RMAppImpl implements RMApp,
           currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
           trackingUrl = this.currentAttempt.getTrackingUrl();
           origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl();
-          if (UserGroupInformation.isSecurityEnabled()
-              && clientUserName != null) {
+          if (UserGroupInformation.isSecurityEnabled()) {
+            // get a token so the client can communicate with the app attempt
+            // NOTE: token may be unavailable if the attempt is not running
             Token<ClientToAMTokenIdentifier> attemptClientToAMToken =
-                new Token<ClientToAMTokenIdentifier>(
-                    new ClientToAMTokenIdentifier(
-                        currentApplicationAttemptId, clientUserName),
-                        rmContext.getClientToAMTokenSecretManager());
-            clientToAMToken = BuilderUtils.newClientToAMToken(
-                attemptClientToAMToken.getIdentifier(),
-                attemptClientToAMToken.getKind().toString(),
-                attemptClientToAMToken.getPassword(),
-                attemptClientToAMToken.getService().toString());
+                this.currentAttempt.createClientToken(clientUserName);
+            if (attemptClientToAMToken != null) {
+              clientToAMToken = BuilderUtils.newClientToAMToken(
+                  attemptClientToAMToken.getIdentifier(),
+                  attemptClientToAMToken.getKind().toString(),
+                  attemptClientToAMToken.getPassword(),
+                  attemptClientToAMToken.getService().toString());
+            }
           }
           host = this.currentAttempt.getHost();
           rpcPort = this.currentAttempt.getRpcPort();
@@ -475,7 +490,7 @@ public class RMAppImpl implements RMApp,
       return BuilderUtils.newApplicationReport(this.applicationId,
           currentApplicationAttemptId, this.user, this.queue,
           this.name, host, rpcPort, clientToAMToken,
-          RMServerUtils.createApplicationState(this.stateMachine.getCurrentState()), diags,
+          createApplicationState(), diags,
           trackingUrl, this.startTime, this.finishTime, finishState,
           appUsageReport, origTrackingUrl, progress, this.applicationType, 
           amrmToken);
@@ -569,7 +584,7 @@ public class RMAppImpl implements RMApp,
   }
   
   @Override
-  public void recover(RMState state) {
+  public void recover(RMState state) throws Exception{
     ApplicationState appState = state.getApplicationState().get(getApplicationId());
     LOG.info("Recovering app: " + getApplicationId() + " with " + 
             + appState.getAttemptCount() + " attempts");
@@ -637,10 +652,18 @@ public class RMAppImpl implements RMApp,
     };
   }
 
-  private static final class RMAppFinishingTransition extends
-      RMAppTransition {
+  private static final class RMAppFinishingTransition extends RMAppTransition {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
+      if (event.getType().equals(RMAppEventType.APP_REMOVED)) {
+        RMAppRemovedEvent removeEvent = (RMAppRemovedEvent) event;
+        if (removeEvent.getRemovedException() != null) {
+          LOG.error(
+            "Failed to remove application: " + removeEvent.getApplicationId(),
+            removeEvent.getRemovedException());
+          ExitUtil.terminate(1, removeEvent.getRemovedException());
+        }
+      }
       app.finishTime = System.currentTimeMillis();
     }
   }
@@ -657,6 +680,15 @@ public class RMAppImpl implements RMApp,
     }
   }
 
+  private static final class RMAppRemovingTransition extends RMAppTransition {
+    @Override
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      LOG.info("Removing application with id " + app.applicationId);
+      app.removeApplicationState();
+      app.previousStateAtRemoving = app.getState();
+    }
+  }
+
   private static class AppFinishedTransition extends FinalTransition {
     public void transition(RMAppImpl app, RMAppEvent event) {
       RMAppFinishedAttemptEvent finishedEvent =
@@ -712,6 +744,9 @@ public class RMAppImpl implements RMApp,
       if (app.getState() != RMAppState.FINISHING) {
         app.finishTime = System.currentTimeMillis();
       }
+      // application completely done and remove from state store.
+      app.removeApplicationState();
+
       app.handler.handle(
           new RMAppManagerEvent(app.applicationId,
           RMAppManagerEventType.APP_COMPLETED));
@@ -764,4 +799,52 @@ public class RMAppImpl implements RMApp,
   public String getApplicationType() {
     return this.applicationType;
   }
+
+  @Override
+  public boolean isAppSafeToUnregister() {
+    RMAppState state = getState();
+    return state.equals(RMAppState.FINISHING)
+        || state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
+        || state.equals(RMAppState.KILLED) ||
+        // If this is an unmanaged AM, we are safe to unregister since unmanaged
+        // AM will immediately go to FINISHED state on AM unregistration
+        getApplicationSubmissionContext().getUnmanagedAM();
+  }
+
+  @Override
+  public YarnApplicationState createApplicationState() {
+    RMAppState rmAppState = getState();
+    // If App is in REMOVING state, return its previous state.
+    if (rmAppState.equals(RMAppState.REMOVING)) {
+      rmAppState = previousStateAtRemoving;
+    }
+    switch (rmAppState) {
+    case NEW:
+      return YarnApplicationState.NEW;
+    case NEW_SAVING:
+      return YarnApplicationState.NEW_SAVING;
+    case SUBMITTED:
+      return YarnApplicationState.SUBMITTED;
+    case ACCEPTED:
+      return YarnApplicationState.ACCEPTED;
+    case RUNNING:
+      return YarnApplicationState.RUNNING;
+    case FINISHING:
+    case FINISHED:
+      return YarnApplicationState.FINISHED;
+    case KILLED:
+      return YarnApplicationState.KILLED;
+    case FAILED:
+      return YarnApplicationState.FAILED;
+    default:
+      throw new YarnRuntimeException("Unknown state passed!");
+    }
+  }
+
+  private void removeApplicationState(){
+    if (!isAppRemovalRequestSent) {
+      rmContext.getStateStore().removeApplication(this);
+      isAppRemovalRequestSent = true;
+    }
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java Wed Oct 30 22:21:59 2013
@@ -24,6 +24,7 @@ public enum RMAppState {
   SUBMITTED,
   ACCEPTED,
   RUNNING,
+  REMOVING,
   FINISHING,
   FINISHED,
   FAILED,

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java Wed Oct 30 22:21:59 2013
@@ -23,6 +23,7 @@ import java.util.Set;
 
 import javax.crypto.SecretKey;
 
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 
 /**
@@ -150,12 +152,22 @@ public interface RMAppAttempt extends Ev
   Token<AMRMTokenIdentifier> getAMRMToken();
 
   /**
-   * The master key for client-to-AM tokens for this app attempt
+   * The master key for client-to-AM tokens for this app attempt. This is only
+   * used for RMStateStore. Normal operation must invoke the secret manager to
+   * get the key and not use the local key directly.
    * @return The master key for client-to-AM tokens for this app attempt
    */
+  @LimitedPrivate("RMStateStore")
   SecretKey getClientTokenMasterKey();
 
   /**
+   * Create a token for authenticating a client connection to the app attempt
+   * @param clientName the name of the client requesting the token
+   * @return the token or null if the attempt is not running
+   */
+  Token<ClientToAMTokenIdentifier> createClientToken(String clientName);
+
+  /**
    * Get application container and resource usage information.
    * @return an ApplicationResourceUsageReport object.
    */

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Wed Oct 30 22:21:59 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 
 import static org.apache.hadoop.yarn.util.StringHelper.pjoin;
 
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -56,11 +57,11 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
@@ -89,6 +90,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -97,6 +99,7 @@ import org.apache.hadoop.yarn.state.Sing
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
@@ -378,7 +381,7 @@ public class RMAppAttemptImpl implements
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
 
-    this.proxiedTrackingUrl = generateProxyUriWithoutScheme();
+    this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
     
     this.stateMachine = stateMachineFactory.make(this);
     this.user = user;
@@ -440,7 +443,8 @@ public class RMAppAttemptImpl implements
   public String getTrackingUrl() {
     this.readLock.lock();
     try {
-      return this.proxiedTrackingUrl;
+      return (getSubmissionContext().getUnmanagedAM()) ? 
+              this.origTrackingUrl : this.proxiedTrackingUrl;
     } finally {
       this.readLock.unlock();
     }
@@ -466,22 +470,17 @@ public class RMAppAttemptImpl implements
     }    
   }
   
-  private String generateProxyUriWithoutScheme() {
-    return generateProxyUriWithoutScheme(null);
-  }
-  
-  private String generateProxyUriWithoutScheme(
+  private String generateProxyUriWithScheme(
       final String trackingUriWithoutScheme) {
     this.readLock.lock();
     try {
       URI trackingUri = StringUtils.isEmpty(trackingUriWithoutScheme) ? null :
         ProxyUriUtils.getUriFromAMUrl(trackingUriWithoutScheme);
-      String proxy = YarnConfiguration.getProxyHostAndPort(conf);
+      String proxy = WebAppUtils.getProxyHostAndPort(conf);
       URI proxyUri = ProxyUriUtils.getUriFromAMUrl(proxy);
       URI result = ProxyUriUtils.getProxyUri(trackingUri, proxyUri,
           applicationAttemptId.getApplicationId());
-      //We need to strip off the scheme to have it match what was there before
-      return result.toASCIIString().substring(HttpConfig.getSchemePrefix().length());
+      return result.toASCIIString();
     } catch (URISyntaxException e) {
       LOG.warn("Could not proxify "+trackingUriWithoutScheme,e);
       return trackingUriWithoutScheme;
@@ -492,11 +491,13 @@ public class RMAppAttemptImpl implements
 
   private void setTrackingUrlToRMAppPage() {
     origTrackingUrl = pjoin(
-        YarnConfiguration.getRMWebAppHostAndPort(conf),
+        WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf),
         "cluster", "app", getAppAttemptId().getApplicationId());
     proxiedTrackingUrl = origTrackingUrl;
   }
 
+  // This is only used for RMStateStore. Normal operation must invoke the secret
+  // manager to get the key and not use the local key directly.
   @Override
   public SecretKey getClientTokenMasterKey() {
     return this.clientTokenMasterKey;
@@ -508,6 +509,26 @@ public class RMAppAttemptImpl implements
   }
 
   @Override
+  public Token<ClientToAMTokenIdentifier> createClientToken(String client) {
+    this.readLock.lock();
+
+    try {
+      Token<ClientToAMTokenIdentifier> token = null;
+      ClientToAMTokenSecretManagerInRM secretMgr =
+          this.rmContext.getClientToAMTokenSecretManager();
+      if (client != null &&
+          secretMgr.getMasterKey(this.applicationAttemptId) != null) {
+        token = new Token<ClientToAMTokenIdentifier>(
+            new ClientToAMTokenIdentifier(this.applicationAttemptId, client),
+            secretMgr);
+      }
+      return token;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
   public String getDiagnostics() {
     this.readLock.lock();
 
@@ -652,7 +673,7 @@ public class RMAppAttemptImpl implements
   }
 
   @Override
-  public void recover(RMState state) {
+  public void recover(RMState state) throws Exception{
     ApplicationState appState = 
         state.getApplicationState().get(getAppAttemptId().getApplicationId());
     ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId());
@@ -667,7 +688,8 @@ public class RMAppAttemptImpl implements
                                  RMAppAttemptEventType.RECOVER));
   }
 
-  private void recoverAppAttemptCredentials(Credentials appAttemptTokens) {
+  private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
+      throws IOException {
     if (appAttemptTokens == null) {
       return;
     }
@@ -684,11 +706,7 @@ public class RMAppAttemptImpl implements
     this.amrmToken =
         (Token<AMRMTokenIdentifier>) appAttemptTokens
           .getToken(RMStateStore.AM_RM_TOKEN_SERVICE);
-
-    // For now, no need to populate tokens back to AMRMTokenSecretManager,
-    // because running attempts are rebooted. Later in work-preserve restart,
-    // we'll create NEW->RUNNING transition in which the restored tokens will be
-    // added to the secret manager
+    rmContext.getAMRMTokenSecretManager().addPersistedPassword(this.amrmToken);
   }
 
   private static class BaseTransition implements
@@ -713,9 +731,9 @@ public class RMAppAttemptImpl implements
           .registerAppAttempt(appAttempt.applicationAttemptId);
 
       if (UserGroupInformation.isSecurityEnabled()) {
-        appAttempt.clientTokenMasterKey = appAttempt.rmContext
-            .getClientToAMTokenSecretManager()
-            .registerApplication(appAttempt.applicationAttemptId);
+        appAttempt.clientTokenMasterKey =
+            appAttempt.rmContext.getClientToAMTokenSecretManager()
+              .createMasterKey(appAttempt.applicationAttemptId);
       }
 
       // create AMRMToken
@@ -810,7 +828,11 @@ public class RMAppAttemptImpl implements
       Allocation amContainerAllocation = appAttempt.scheduler.allocate(
           appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST,
           EMPTY_CONTAINER_RELEASE_LIST, null, null);
-
+      // There must be at least one container allocated, because a
+      // CONTAINER_ALLOCATED is emitted after an RMContainer is constructed,
+      // and is put in SchedulerApplication#newlyAllocatedContainers. Then,
+      // YarnScheduler#allocate will fetch it.
+      assert amContainerAllocation.getContainers().size() != 0;
       // Set the masterContainer
       appAttempt.setMasterContainer(amContainerAllocation.getContainers().get(
                                                                            0));
@@ -897,6 +919,12 @@ public class RMAppAttemptImpl implements
                             RMAppAttemptEvent event) {
       // Register with AMLivelinessMonitor
       appAttempt.attemptLaunched();
+
+      // register the ClientTokenMasterKey after it is saved in the store,
+      // otherwise client may hold an invalid ClientToken after RM restarts.
+      appAttempt.rmContext.getClientToAMTokenSecretManager()
+      .registerApplication(appAttempt.getAppAttemptId(),
+        appAttempt.getClientTokenMasterKey());
     }
   }
   
@@ -970,9 +998,10 @@ public class RMAppAttemptImpl implements
           = (RMAppAttemptRegistrationEvent) event;
       appAttempt.host = registrationEvent.getHost();
       appAttempt.rpcPort = registrationEvent.getRpcport();
-      appAttempt.origTrackingUrl = registrationEvent.getTrackingurl();
+      appAttempt.origTrackingUrl =
+          sanitizeTrackingUrl(registrationEvent.getTrackingurl());
       appAttempt.proxiedTrackingUrl = 
-        appAttempt.generateProxyUriWithoutScheme(appAttempt.origTrackingUrl);
+        appAttempt.generateProxyUriWithScheme(appAttempt.origTrackingUrl);
 
       // Let the app know
       appAttempt.eventHandler.handle(new RMAppEvent(appAttempt
@@ -1105,9 +1134,10 @@ public class RMAppAttemptImpl implements
       RMAppAttemptUnregistrationEvent unregisterEvent
         = (RMAppAttemptUnregistrationEvent) event;
       appAttempt.diagnostics.append(unregisterEvent.getDiagnostics());
-      appAttempt.origTrackingUrl = unregisterEvent.getTrackingUrl();
+      appAttempt.origTrackingUrl =
+          sanitizeTrackingUrl(unregisterEvent.getTrackingUrl());
       appAttempt.proxiedTrackingUrl = 
-        appAttempt.generateProxyUriWithoutScheme(appAttempt.origTrackingUrl);
+        appAttempt.generateProxyUriWithScheme(appAttempt.origTrackingUrl);
       appAttempt.finalStatus = unregisterEvent.getFinalApplicationStatus();
 
       // Tell the app
@@ -1122,7 +1152,7 @@ public class RMAppAttemptImpl implements
       ApplicationId applicationId =
           appAttempt.getAppAttemptId().getApplicationId();
       appAttempt.eventHandler.handle(
-          new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_FINISHING));
+          new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_UNREGISTERED));
       return RMAppAttemptState.FINISHING;
     }
   }
@@ -1259,4 +1289,8 @@ public class RMAppAttemptImpl implements
     appAttempt.rmContext.getAMRMTokenSecretManager()
       .applicationMasterFinished(appAttempt.getAppAttemptId());
   }
+
+  private static String sanitizeTrackingUrl(String url) {
+    return (url == null || url.trim().isEmpty()) ? "N/A" : url;
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java Wed Oct 30 22:21:59 2013
@@ -27,7 +27,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
-import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 
 /**
  * Node managers information on available resources 
@@ -84,7 +83,13 @@ public interface RMNode {
    * @return the time of the latest health report received from this node.
    */
   public long getLastHealthReportTime();
-  
+
+  /**
+   * the node manager version of the node received as part of the
+   * registration with the resource manager
+   */
+  public String getNodeManagerVersion();
+
   /**
    * the total available resource.
    * @return the total available resource.

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Wed Oct 30 22:21:59 2013
@@ -97,6 +97,7 @@ public class RMNodeImpl implements RMNod
 
   private String healthReport;
   private long lastHealthReportTime;
+  private String nodeManagerVersion;
 
   /* set of containers that have just launched */
   private final Map<ContainerId, ContainerStatus> justLaunchedContainers = 
@@ -172,7 +173,7 @@ public class RMNodeImpl implements RMNod
                              RMNodeEvent> stateMachine;
 
   public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
-      int cmPort, int httpPort, Node node, Resource capability) {
+      int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) {
     this.nodeId = nodeId;
     this.context = context;
     this.hostName = hostName;
@@ -184,6 +185,7 @@ public class RMNodeImpl implements RMNod
     this.node = node;
     this.healthReport = "Healthy";
     this.lastHealthReportTime = System.currentTimeMillis();
+    this.nodeManagerVersion = nodeManagerVersion;
 
     this.latestNodeHeartBeatResponse.setResponseId(0);
 
@@ -289,6 +291,11 @@ public class RMNodeImpl implements RMNod
   }
 
   @Override
+  public String getNodeManagerVersion() {
+    return nodeManagerVersion;
+  }
+
+  @Override
   public NodeState getState() {
     this.readLock.lock();
 
@@ -460,8 +467,11 @@ public class RMNodeImpl implements RMNod
           && rmNode.getHttpPort() == newNode.getHttpPort()) {
         // Reset heartbeat ID since node just restarted.
         rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
-        rmNode.context.getDispatcher().getEventHandler().handle(
-            new NodeAddedSchedulerEvent(rmNode));
+        if (rmNode.getState() != NodeState.UNHEALTHY) {
+          // Only add new node if old state is not UNHEALTHY
+          rmNode.context.getDispatcher().getEventHandler().handle(
+              new NodeAddedSchedulerEvent(rmNode));
+         }
       } else {
         // Reconnected node differs, so replace old node and start new node
         switch (rmNode.getState()) {

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Wed Oct 30 22:21:59 2013
@@ -116,14 +116,11 @@ public class AppSchedulingInfo {
    * The ApplicationMaster is updating resource requirements for the
    * application, by asking for more resources and releasing resources acquired
    * by the application.
-   * 
+   *
    * @param requests resources to be acquired
-   * @param blacklistAdditions resources to be added to the blacklist
-   * @param blacklistRemovals resources to be removed from the blacklist
    */
   synchronized public void updateResourceRequests(
-      List<ResourceRequest> requests,
-      List<String> blacklistAdditions, List<String> blacklistRemovals) {
+      List<ResourceRequest> requests) {
     QueueMetrics metrics = queue.getMetrics();
     
     // Update resource requests
@@ -181,11 +178,16 @@ public class AppSchedulingInfo {
                 lastRequestContainers)));
       }
     }
+  }
 
-    //
-    // Update blacklist
-    //
-    
+  /**
+   * The ApplicationMaster is updating the blacklist
+   *
+   * @param blacklistAdditions resources to be added to the blacklist
+   * @param blacklistRemovals resources to be removed from the blacklist
+   */
+  synchronized public void updateBlacklist(
+      List<String> blacklistAdditions, List<String> blacklistRemovals) {
     // Add to blacklist
     if (blacklistAdditions != null) {
       blacklist.addAll(blacklistAdditions);

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java Wed Oct 30 22:21:59 2013
@@ -19,12 +19,10 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@@ -45,12 +43,6 @@ public interface Queue {
   QueueMetrics getMetrics();
 
   /**
-   * Get ACLs for the queue.
-   * @return ACLs for the queue
-   */
-  public Map<QueueACL, AccessControlList> getQueueAcls();
-  
-  /**
    * Get queue information
    * @param includeChildQueues include child queues?
    * @param recursive recursively get child queue information?
@@ -64,4 +56,6 @@ public interface Queue {
    * @return queue ACLs for user
    */
   List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user);
+
+  boolean hasAccess(QueueACL acl, UserGroupInformation user);
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java Wed Oct 30 22:21:59 2013
@@ -73,7 +73,7 @@ public class QueueMetrics implements Met
   @Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores;
   @Metric("# of reserved containers") MutableGaugeInt reservedContainers;
   @Metric("# of active users") MutableGaugeInt activeUsers;
-  @Metric("# of active users") MutableGaugeInt activeApplications;
+  @Metric("# of active applications") MutableGaugeInt activeApplications;
   private final MutableGaugeInt[] runningTime;
   private TimeBucketMetrics<ApplicationId> runBuckets;
 



Mime
View raw message