hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject svn commit: r1532967 [5/9] - in /hadoop/common/branches/HDFS-4949/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/java...
Date Thu, 17 Oct 2013 05:33:06 GMT
Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Thu Oct 17 05:32:42 2013
@@ -27,6 +27,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.http.HttpConfig.Policy;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.security.SecurityUtil;
@@ -77,6 +79,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
@@ -88,6 +91,7 @@ import org.apache.hadoop.yarn.server.web
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
 import org.apache.hadoop.yarn.webapp.WebApps.Builder;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -105,7 +109,14 @@ public class ResourceManager extends Com
   public static final int SHUTDOWN_HOOK_PRIORITY = 30;
 
   private static final Log LOG = LogFactory.getLog(ResourceManager.class);
-  public static final long clusterTimeStamp = System.currentTimeMillis();
+  private static long clusterTimeStamp = System.currentTimeMillis();
+
+  /**
+   * "Always On" services. Services that need to run always irrespective of
+   * the HA state of the RM.
+   */
+  @VisibleForTesting
+  protected RMHAProtocolService haService;
 
   /**
    * "Active" services. Services that need to run only on the Active RM.
@@ -137,6 +148,7 @@ public class ResourceManager extends Com
   private EventHandler<SchedulerEvent> schedulerDispatcher;
   protected RMAppManager rmAppManager;
   protected ApplicationACLsManager applicationACLsManager;
+  protected QueueACLsManager queueACLsManager;
   protected RMDelegationTokenSecretManager rmDTSecretManager;
   private DelegationTokenRenewer delegationTokenRenewer;
   private WebApp webApp;
@@ -155,17 +167,31 @@ public class ResourceManager extends Com
   public RMContext getRMContext() {
     return this.rmContext;
   }
-  
+
+  public static long getClusterTimeStamp() {
+    return clusterTimeStamp;
+  }
+
+  @VisibleForTesting
+  protected static void setClusterTimeStamp(long timestamp) {
+    clusterTimeStamp = timestamp;
+  }
+
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     validateConfigs(conf);
     this.conf = conf;
 
-    activeServices = new RMActiveServices();
-    addService(activeServices);
+    haService = new RMHAProtocolService(this);
+    addService(haService);
     super.serviceInit(conf);
   }
   
+  protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler,
+      Configuration conf) {
+    return new QueueACLsManager(scheduler, conf);
+  }
+
   @VisibleForTesting
   protected void setRMStateStore(RMStateStore rmStore) {
     rmStore.setRMDispatcher(rmDispatcher);
@@ -372,6 +398,8 @@ public class ResourceManager extends Com
 
       applicationACLsManager = new ApplicationACLsManager(conf);
 
+      queueACLsManager = createQueueACLsManager(scheduler, conf);
+
       rmAppManager = createRMAppManager();
       // Register event handler for RMAppManagerEvents
       rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
@@ -430,12 +458,8 @@ public class ResourceManager extends Com
       }
 
       if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
-        String hostname = getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS,
-            YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS);
-        hostname = (hostname.contains(":")) ? hostname.substring(0, hostname.indexOf(":")) : hostname;
         int port = webApp.port();
-        String resolvedAddress = hostname + ":" + port;
-        conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, resolvedAddress);
+        WebAppUtils.setRMWebAppPort(conf, port);
       }
 
       super.serviceStart();
@@ -470,6 +494,7 @@ public class ResourceManager extends Com
           LOG.error("Error closing store.", e);
         }
       }
+
       super.serviceStop();
     }
   }
@@ -692,10 +717,9 @@ public class ResourceManager extends Com
                 YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY)
             .withHttpSpnegoKeytabKey(
                 YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
-            .at(this.conf.get(YarnConfiguration.RM_WEBAPP_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS)); 
-    String proxyHostAndPort = YarnConfiguration.getProxyHostAndPort(conf);
-    if(YarnConfiguration.getRMWebAppHostAndPort(conf).
+            .at(WebAppUtils.getRMWebAppURLWithoutScheme(conf)); 
+    String proxyHostAndPort = WebAppUtils.getProxyHostAndPort(conf);
+    if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf).
         equals(proxyHostAndPort)) {
       AppReportFetcher fetcher = new AppReportFetcher(conf, getClientRMService());
       builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME, 
@@ -708,6 +732,47 @@ public class ResourceManager extends Com
     webApp = builder.start(new RMWebApp(this));
   }
 
+  void setConf(Configuration configuration) {
+    conf = configuration;
+  }
+
+  /**
+   * Helper method to create and init {@link #activeServices}. This creates an
+   * instance of {@link RMActiveServices} and initializes it.
+   * @throws Exception
+   */
+  void createAndInitActiveServices() throws Exception {
+    activeServices = new RMActiveServices();
+    activeServices.init(conf);
+  }
+
+  /**
+   * Helper method to start {@link #activeServices}.
+   * @throws Exception
+   */
+  void startActiveServices() throws Exception {
+    if (activeServices != null) {
+      clusterTimeStamp = System.currentTimeMillis();
+      activeServices.start();
+    }
+  }
+
+  /**
+   * Helper method to stop {@link #activeServices}.
+   * @throws Exception
+   */
+  void stopActiveServices() throws Exception {
+    if (activeServices != null) {
+      activeServices.stop();
+      activeServices = null;
+    }
+  }
+
+  @VisibleForTesting
+  protected boolean areActiveServicesRunning() {
+    return activeServices != null && activeServices.isInState(STATE.STARTED);
+  }
+
   @Override
   protected void serviceStart() throws Exception {
     try {
@@ -715,7 +780,6 @@ public class ResourceManager extends Com
     } catch(IOException ie) {
       throw new YarnRuntimeException("Failed to login", ie);
     }
-
     super.serviceStart();
   }
   
@@ -753,7 +817,8 @@ public class ResourceManager extends Com
 
   protected ClientRMService createClientRMService() {
     return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
-        this.applicationACLsManager, this.rmDTSecretManager);
+        this.applicationACLsManager, this.queueACLsManager,
+        this.rmDTSecretManager);
   }
 
   protected ApplicationMasterService createApplicationMasterService() {
@@ -834,6 +899,11 @@ public class ResourceManager extends Com
   }
 
   @Private
+  public QueueACLsManager getQueueACLsManager() {
+    return this.queueACLsManager;
+  }
+
+  @Private
   public RMContainerTokenSecretManager getRMContainerTokenSecretManager() {
     return this.containerTokenSecretManager;
   }
@@ -856,7 +926,7 @@ public class ResourceManager extends Com
     // recover applications
     rmAppManager.recover(state);
   }
-  
+
   public static void main(String argv[]) {
     Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
@@ -866,6 +936,7 @@ public class ResourceManager extends Com
       ShutdownHookManager.get().addShutdownHook(
         new CompositeServiceShutdownHook(resourceManager),
         SHUTDOWN_HOOK_PRIORITY);
+      setHttpPolicy(conf);
       resourceManager.init(conf);
       resourceManager.start();
     } catch (Throwable t) {
@@ -873,4 +944,10 @@ public class ResourceManager extends Com
       System.exit(-1);
     }
   }
+  
+  private static void setHttpPolicy(Configuration conf) {
+    HttpConfig.setPolicy(Policy.fromString(conf.get(
+      YarnConfiguration.YARN_HTTP_POLICY_KEY,
+      YarnConfiguration.YARN_HTTP_POLICY_DEFAULT)));
+  }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Thu Oct 17 05:32:42 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.VersionUtil;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -55,6 +56,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.hadoop.yarn.util.YarnVersionInfo;
 
 public class ResourceTrackerService extends AbstractService implements
     ResourceTracker {
@@ -73,6 +75,7 @@ public class ResourceTrackerService exte
   private long nextHeartBeatInterval;
   private Server server;
   private InetSocketAddress resourceTrackerAddress;
+  private String minimumNodeManagerVersion;
 
   private static final NodeHeartbeatResponse resync = recordFactory
       .newRecordInstance(NodeHeartbeatResponse.class);
@@ -99,6 +102,7 @@ public class ResourceTrackerService exte
     this.nmLivelinessMonitor = nmLivelinessMonitor;
     this.containerTokenSecretManager = containerTokenSecretManager;
     this.nmTokenSecretManager = nmTokenSecretManager;
+
   }
 
   @Override
@@ -124,7 +128,11 @@ public class ResourceTrackerService exte
     minAllocVcores = conf.getInt(
     	YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
     	YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
-    
+
+    minimumNodeManagerVersion = conf.get(
+        YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
+        YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
+
     super.serviceInit(conf);
   }
 
@@ -172,10 +180,30 @@ public class ResourceTrackerService exte
     int cmPort = nodeId.getPort();
     int httpPort = request.getHttpPort();
     Resource capability = request.getResource();
+    String nodeManagerVersion = request.getNMVersion();
 
     RegisterNodeManagerResponse response = recordFactory
         .newRecordInstance(RegisterNodeManagerResponse.class);
 
+    if (!minimumNodeManagerVersion.equals("NONE")) {
+      if (minimumNodeManagerVersion.equals("EqualToRM")) {
+        minimumNodeManagerVersion = YarnVersionInfo.getVersion();
+      }
+
+      if ((nodeManagerVersion == null) ||
+          (VersionUtil.compareVersions(nodeManagerVersion,minimumNodeManagerVersion)) < 0) {
+        String message =
+            "Disallowed NodeManager Version " + nodeManagerVersion
+                + ", is less than the minimum version "
+                + minimumNodeManagerVersion + " sending SHUTDOWN signal to "
+                + "NodeManager.";
+        LOG.info(message);
+        response.setDiagnosticsMessage(message);
+        response.setNodeAction(NodeAction.SHUTDOWN);
+        return response;
+      }
+    }
+
     // Check if this node is a 'valid' node
     if (!this.nodesListManager.isValidNode(host)) {
       String message =
@@ -206,7 +234,7 @@ public class ResourceTrackerService exte
         .getCurrentKey());    
 
     RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
-        resolve(host), capability);
+        resolve(host), capability, nodeManagerVersion);
 
     RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
     if (oldNode == null) {
@@ -229,7 +257,8 @@ public class ResourceTrackerService exte
             + ", assigned nodeId " + nodeId;
     LOG.info(message);
     response.setNodeAction(NodeAction.NORMAL);
-    response.setRMIdentifier(ResourceManager.clusterTimeStamp);
+    response.setRMIdentifier(ResourceManager.getClusterTimeStamp());
+    response.setRMVersion(YarnVersionInfo.getVersion());
     return response;
   }
 

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Thu Oct 17 05:32:42 2013
@@ -63,12 +63,6 @@ public class FileSystemRMStateStore exte
   public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
 
   private static final String ROOT_DIR_NAME = "FSRMStateRoot";
-  private static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
-  private static final String RM_APP_ROOT = "RMAppRoot";
-  private static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
-  private static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
-  private static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
-      "RMDTSequenceNumber_";
 
   protected FileSystem fs;
 

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Thu Oct 17 05:32:42 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@@ -108,7 +109,9 @@ public class MemoryRMStateStore extends 
 
     ApplicationState appState = state.getApplicationState().get(
         attemptState.getAttemptId().getApplicationId());
-    assert appState != null;
+    if (appState == null) {
+      throw new YarnRuntimeException("Application doesn't exist");
+    }
 
     if (appState.attempts.containsKey(attemptState.getAttemptId())) {
       Exception e = new IOException("Attempt: " +
@@ -125,7 +128,9 @@ public class MemoryRMStateStore extends 
                                                             throws Exception {
     ApplicationId appId = appState.getAppId();
     ApplicationState removed = state.appState.remove(appId);
-    assert removed != null;
+    if (removed == null) {
+      throw new YarnRuntimeException("Removing non-exsisting application state");
+    }
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Thu Oct 17 05:32:42 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
 
@@ -64,6 +65,14 @@ import org.apache.hadoop.yarn.server.res
  */
 public abstract class RMStateStore extends AbstractService {
 
+  // constants for RM App state and RMDTSecretManagerState.
+  protected static final String RM_APP_ROOT = "RMAppRoot";
+  protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
+  protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
+  protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
+  protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
+      "RMDTSequenceNumber_";
+
   public static final Log LOG = LogFactory.getLog(RMStateStore.class);
 
   public RMStateStore() {
@@ -463,8 +472,9 @@ public abstract class RMStateStore exten
               (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
                   .newApplicationAttemptStateData(attemptState.getAttemptId(),
                     attemptState.getMasterContainer(), appAttemptTokens);
-
-            LOG.info("Storing info for attempt: " + attemptState.getAttemptId());
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
+            }
             storeApplicationAttemptState(attemptState.getAttemptId().toString(), 
                                          attemptStateData);
           } catch (Exception e) {
@@ -482,12 +492,15 @@ public abstract class RMStateStore exten
           ApplicationState appState = 
                           ((RMStateStoreRemoveAppEvent) event).getAppState();
           ApplicationId appId = appState.getAppId();
-          
+          Exception removedException = null;
           LOG.info("Removing info for app: " + appId);
           try {
             removeApplicationState(appState);
           } catch (Exception e) {
             LOG.error("Error removing app: " + appId, e);
+            removedException = e;
+          } finally {
+            notifyDoneRemovingApplcation(appId, removedException);
           }
         }
         break;
@@ -521,7 +534,18 @@ public abstract class RMStateStore exten
     rmDispatcher.getEventHandler().handle(
         new RMAppAttemptStoredEvent(attemptId, storedException));
   }
-  
+
+  @SuppressWarnings("unchecked")
+  /**
+   * This is to notify RMApp that this application has been removed from
+   * RMStateStore
+   */
+  private void notifyDoneRemovingApplcation(ApplicationId appId,
+      Exception removedException) {
+    rmDispatcher.getEventHandler().handle(
+      new RMAppRemovedEvent(appId, removedException));
+  }
+
   /**
    * EventHandler implementation which forward events to the FSRMStateStore
    * This hides the EventHandle methods of the store from its public interface 

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Thu Oct 17 05:32:42 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -194,4 +195,20 @@ public interface RMApp extends EventHand
    * @return the application type.
    */
   String getApplicationType(); 
+
+  /**
+   * Check whether this application is safe to unregister.
+   * An application is deemed to be safe to unregister if it is an unmanaged
+   * AM or its state has been removed from state store.
+   * @return the flag which indicates whether this application is safe to
+   *         unregister.
+   */
+  boolean isAppSafeToUnregister();
+
+  /**
+   * Create the external user-facing state of ApplicationMaster from the
+   * current state of the {@link RMApp}.
+   * @return the external user-facing state of ApplicationMaster.
+   */
+  YarnApplicationState createApplicationState();
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Thu Oct 17 05:32:42 2013
@@ -27,11 +27,14 @@ public enum RMAppEventType {
   // Source: RMAppAttempt
   APP_REJECTED,
   APP_ACCEPTED,
-  APP_SAVED,
   ATTEMPT_REGISTERED,
-  ATTEMPT_FINISHING,
+  ATTEMPT_UNREGISTERED,
   ATTEMPT_FINISHED, // Will send the final state
   ATTEMPT_FAILED,
   ATTEMPT_KILLED,
-  NODE_UPDATE
+  NODE_UPDATE,
+
+  // Source: RMStateStore
+  APP_SAVED,
+  APP_REMOVED
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Thu Oct 17 05:32:42 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -109,6 +111,8 @@ public class RMAppImpl implements RMApp,
   private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
   private static final AppFinishedTransition FINISHED_TRANSITION =
       new AppFinishedTransition();
+  private boolean isAppRemovalRequestSent = false;
+  private RMAppState previousStateAtRemoving;
 
   private static final StateMachineFactory<RMAppImpl,
                                            RMAppState,
@@ -167,8 +171,9 @@ public class RMAppImpl implements RMApp,
      // Transitions from RUNNING state
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
-    .addTransition(RMAppState.RUNNING, RMAppState.FINISHING,
-        RMAppEventType.ATTEMPT_FINISHING, new RMAppFinishingTransition())
+    .addTransition(RMAppState.RUNNING, RMAppState.REMOVING,
+          RMAppEventType.ATTEMPT_UNREGISTERED,
+        new RMAppRemovingTransition())
     .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
         RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
     .addTransition(RMAppState.RUNNING,
@@ -178,6 +183,17 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.RUNNING, RMAppState.KILLED,
         RMAppEventType.KILL, new KillAppAndAttemptTransition())
 
+     // Transitions from REMOVING state
+    .addTransition(RMAppState.REMOVING, RMAppState.FINISHING,
+        RMAppEventType.APP_REMOVED,  new RMAppFinishingTransition())
+    .addTransition(RMAppState.REMOVING, RMAppState.FINISHED,
+        RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+    .addTransition(RMAppState.REMOVING, RMAppState.KILLED,
+        RMAppEventType.KILL, new KillAppAndAttemptTransition())
+    // ignorable transitions
+    .addTransition(RMAppState.REMOVING, RMAppState.REMOVING,
+        RMAppEventType.NODE_UPDATE)
+
      // Transitions from FINISHING state
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
         RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
@@ -185,36 +201,34 @@ public class RMAppImpl implements RMApp,
         RMAppEventType.KILL, new KillAppAndAttemptTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
-        RMAppEventType.NODE_UPDATE)
+      EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.APP_REMOVED))
 
      // Transitions from FINISHED state
-    .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
-        RMAppEventType.KILL)
      // ignorable transitions
     .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
         EnumSet.of(
             RMAppEventType.NODE_UPDATE,
-            RMAppEventType.ATTEMPT_FINISHING,
-            RMAppEventType.ATTEMPT_FINISHED))
+            RMAppEventType.ATTEMPT_UNREGISTERED,
+            RMAppEventType.ATTEMPT_FINISHED,
+            RMAppEventType.KILL,
+            RMAppEventType.APP_REMOVED))
 
      // Transitions from FAILED state
-    .addTransition(RMAppState.FAILED, RMAppState.FAILED,
-        EnumSet.of(RMAppEventType.KILL, RMAppEventType.APP_SAVED))
      // ignorable transitions
-    .addTransition(RMAppState.FAILED, RMAppState.FAILED, 
-        RMAppEventType.NODE_UPDATE)
+    .addTransition(RMAppState.FAILED, RMAppState.FAILED,
+        EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE,
+          RMAppEventType.APP_SAVED, RMAppEventType.APP_REMOVED))
 
      // Transitions from KILLED state
+     // ignorable transitions
     .addTransition(
         RMAppState.KILLED,
         RMAppState.KILLED,
         EnumSet.of(RMAppEventType.APP_ACCEPTED,
             RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
             RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
-            RMAppEventType.ATTEMPT_KILLED, RMAppEventType.APP_SAVED))
-     // ignorable transitions
-    .addTransition(RMAppState.KILLED, RMAppState.KILLED,
-        RMAppEventType.NODE_UPDATE)
+            RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE,
+            RMAppEventType.APP_SAVED, RMAppEventType.APP_REMOVED))
 
      .installTopology();
 
@@ -384,6 +398,7 @@ public class RMAppImpl implements RMApp,
     case SUBMITTED:
     case ACCEPTED:
     case RUNNING:
+    case REMOVING:
       return FinalApplicationStatus.UNDEFINED;    
     // finished without a proper final state is the same as failed  
     case FINISHING:
@@ -475,7 +490,7 @@ public class RMAppImpl implements RMApp,
       return BuilderUtils.newApplicationReport(this.applicationId,
           currentApplicationAttemptId, this.user, this.queue,
           this.name, host, rpcPort, clientToAMToken,
-          RMServerUtils.createApplicationState(this.stateMachine.getCurrentState()), diags,
+          createApplicationState(), diags,
           trackingUrl, this.startTime, this.finishTime, finishState,
           appUsageReport, origTrackingUrl, progress, this.applicationType, 
           amrmToken);
@@ -569,7 +584,7 @@ public class RMAppImpl implements RMApp,
   }
   
   @Override
-  public void recover(RMState state) {
+  public void recover(RMState state) throws Exception{
     ApplicationState appState = state.getApplicationState().get(getApplicationId());
     LOG.info("Recovering app: " + getApplicationId() + " with " + 
             + appState.getAttemptCount() + " attempts");
@@ -637,10 +652,18 @@ public class RMAppImpl implements RMApp,
     };
   }
 
-  private static final class RMAppFinishingTransition extends
-      RMAppTransition {
+  private static final class RMAppFinishingTransition extends RMAppTransition {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
+      if (event.getType().equals(RMAppEventType.APP_REMOVED)) {
+        RMAppRemovedEvent removeEvent = (RMAppRemovedEvent) event;
+        if (removeEvent.getRemovedException() != null) {
+          LOG.error(
+            "Failed to remove application: " + removeEvent.getApplicationId(),
+            removeEvent.getRemovedException());
+          ExitUtil.terminate(1, removeEvent.getRemovedException());
+        }
+      }
       app.finishTime = System.currentTimeMillis();
     }
   }
@@ -657,6 +680,15 @@ public class RMAppImpl implements RMApp,
     }
   }
 
+  private static final class RMAppRemovingTransition extends RMAppTransition {
+    @Override
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      LOG.info("Removing application with id " + app.applicationId);
+      app.removeApplicationState();
+      app.previousStateAtRemoving = app.getState();
+    }
+  }
+
   private static class AppFinishedTransition extends FinalTransition {
     public void transition(RMAppImpl app, RMAppEvent event) {
       RMAppFinishedAttemptEvent finishedEvent =
@@ -712,6 +744,9 @@ public class RMAppImpl implements RMApp,
       if (app.getState() != RMAppState.FINISHING) {
         app.finishTime = System.currentTimeMillis();
       }
+      // application completely done and remove from state store.
+      app.removeApplicationState();
+
       app.handler.handle(
           new RMAppManagerEvent(app.applicationId,
           RMAppManagerEventType.APP_COMPLETED));
@@ -764,4 +799,52 @@ public class RMAppImpl implements RMApp,
   public String getApplicationType() {
     return this.applicationType;
   }
+
+  @Override
+  public boolean isAppSafeToUnregister() {
+    RMAppState state = getState();
+    return state.equals(RMAppState.FINISHING)
+        || state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
+        || state.equals(RMAppState.KILLED) ||
+        // If this is an unmanaged AM, we are safe to unregister since unmanaged
+        // AM will immediately go to FINISHED state on AM unregistration
+        getApplicationSubmissionContext().getUnmanagedAM();
+  }
+
+  @Override
+  public YarnApplicationState createApplicationState() {
+    RMAppState rmAppState = getState();
+    // If App is in REMOVING state, return its previous state.
+    if (rmAppState.equals(RMAppState.REMOVING)) {
+      rmAppState = previousStateAtRemoving;
+    }
+    switch (rmAppState) {
+    case NEW:
+      return YarnApplicationState.NEW;
+    case NEW_SAVING:
+      return YarnApplicationState.NEW_SAVING;
+    case SUBMITTED:
+      return YarnApplicationState.SUBMITTED;
+    case ACCEPTED:
+      return YarnApplicationState.ACCEPTED;
+    case RUNNING:
+      return YarnApplicationState.RUNNING;
+    case FINISHING:
+    case FINISHED:
+      return YarnApplicationState.FINISHED;
+    case KILLED:
+      return YarnApplicationState.KILLED;
+    case FAILED:
+      return YarnApplicationState.FAILED;
+    default:
+      throw new YarnRuntimeException("Unknown state passed!");
+    }
+  }
+
+  private void removeApplicationState(){
+    if (!isAppRemovalRequestSent) {
+      rmContext.getStateStore().removeApplication(this);
+      isAppRemovalRequestSent = true;
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java Thu Oct 17 05:32:42 2013
@@ -24,6 +24,7 @@ public enum RMAppState {
   SUBMITTED,
   ACCEPTED,
   RUNNING,
+  REMOVING,
   FINISHING,
   FINISHED,
   FAILED,

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java Thu Oct 17 05:32:42 2013
@@ -23,6 +23,7 @@ import java.util.Set;
 
 import javax.crypto.SecretKey;
 
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -151,9 +152,12 @@ public interface RMAppAttempt extends Ev
   Token<AMRMTokenIdentifier> getAMRMToken();
 
   /**
-   * The master key for client-to-AM tokens for this app attempt
+   * The master key for client-to-AM tokens for this app attempt. This is only
+   * used for RMStateStore. Normal operation must invoke the secret manager to
+   * get the key and not use the local key directly.
    * @return The master key for client-to-AM tokens for this app attempt
    */
+  @LimitedPrivate("RMStateStore")
   SecretKey getClientTokenMasterKey();
 
   /**

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Thu Oct 17 05:32:42 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 
 import static org.apache.hadoop.yarn.util.StringHelper.pjoin;
 
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -56,7 +57,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -99,6 +99,7 @@ import org.apache.hadoop.yarn.state.Sing
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
@@ -380,7 +381,7 @@ public class RMAppAttemptImpl implements
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
 
-    this.proxiedTrackingUrl = generateProxyUriWithoutScheme();
+    this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
     
     this.stateMachine = stateMachineFactory.make(this);
     this.user = user;
@@ -469,22 +470,17 @@ public class RMAppAttemptImpl implements
     }    
   }
   
-  private String generateProxyUriWithoutScheme() {
-    return generateProxyUriWithoutScheme(null);
-  }
-  
-  private String generateProxyUriWithoutScheme(
+  private String generateProxyUriWithScheme(
       final String trackingUriWithoutScheme) {
     this.readLock.lock();
     try {
       URI trackingUri = StringUtils.isEmpty(trackingUriWithoutScheme) ? null :
         ProxyUriUtils.getUriFromAMUrl(trackingUriWithoutScheme);
-      String proxy = YarnConfiguration.getProxyHostAndPort(conf);
+      String proxy = WebAppUtils.getProxyHostAndPort(conf);
       URI proxyUri = ProxyUriUtils.getUriFromAMUrl(proxy);
       URI result = ProxyUriUtils.getProxyUri(trackingUri, proxyUri,
           applicationAttemptId.getApplicationId());
-      //We need to strip off the scheme to have it match what was there before
-      return result.toASCIIString().substring(HttpConfig.getSchemePrefix().length());
+      return result.toASCIIString();
     } catch (URISyntaxException e) {
       LOG.warn("Could not proxify "+trackingUriWithoutScheme,e);
       return trackingUriWithoutScheme;
@@ -495,11 +491,13 @@ public class RMAppAttemptImpl implements
 
   private void setTrackingUrlToRMAppPage() {
     origTrackingUrl = pjoin(
-        YarnConfiguration.getRMWebAppHostAndPort(conf),
+        WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf),
         "cluster", "app", getAppAttemptId().getApplicationId());
     proxiedTrackingUrl = origTrackingUrl;
   }
 
+  // This is only used for RMStateStore. Normal operation must invoke the secret
+  // manager to get the key and not use the local key directly.
   @Override
   public SecretKey getClientTokenMasterKey() {
     return this.clientTokenMasterKey;
@@ -675,7 +673,7 @@ public class RMAppAttemptImpl implements
   }
 
   @Override
-  public void recover(RMState state) {
+  public void recover(RMState state) throws Exception{
     ApplicationState appState = 
         state.getApplicationState().get(getAppAttemptId().getApplicationId());
     ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId());
@@ -690,7 +688,8 @@ public class RMAppAttemptImpl implements
                                  RMAppAttemptEventType.RECOVER));
   }
 
-  private void recoverAppAttemptCredentials(Credentials appAttemptTokens) {
+  private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
+      throws IOException {
     if (appAttemptTokens == null) {
       return;
     }
@@ -707,11 +706,7 @@ public class RMAppAttemptImpl implements
     this.amrmToken =
         (Token<AMRMTokenIdentifier>) appAttemptTokens
           .getToken(RMStateStore.AM_RM_TOKEN_SERVICE);
-
-    // For now, no need to populate tokens back to AMRMTokenSecretManager,
-    // because running attempts are rebooted. Later in work-preserve restart,
-    // we'll create NEW->RUNNING transition in which the restored tokens will be
-    // added to the secret manager
+    rmContext.getAMRMTokenSecretManager().addPersistedPassword(this.amrmToken);
   }
 
   private static class BaseTransition implements
@@ -736,9 +731,9 @@ public class RMAppAttemptImpl implements
           .registerAppAttempt(appAttempt.applicationAttemptId);
 
       if (UserGroupInformation.isSecurityEnabled()) {
-        appAttempt.clientTokenMasterKey = appAttempt.rmContext
-            .getClientToAMTokenSecretManager()
-            .registerApplication(appAttempt.applicationAttemptId);
+        appAttempt.clientTokenMasterKey =
+            appAttempt.rmContext.getClientToAMTokenSecretManager()
+              .createMasterKey(appAttempt.applicationAttemptId);
       }
 
       // create AMRMToken
@@ -924,6 +919,12 @@ public class RMAppAttemptImpl implements
                             RMAppAttemptEvent event) {
       // Register with AMLivelinessMonitor
       appAttempt.attemptLaunched();
+
+      // register the ClientTokenMasterKey after it is saved in the store,
+      // otherwise client may hold an invalid ClientToken after RM restarts.
+      appAttempt.rmContext.getClientToAMTokenSecretManager()
+      .registerApplication(appAttempt.getAppAttemptId(),
+        appAttempt.getClientTokenMasterKey());
     }
   }
   
@@ -988,7 +989,7 @@ public class RMAppAttemptImpl implements
     }
   }
 
-  static final class AMRegisteredTransition extends BaseTransition {
+  private static final class AMRegisteredTransition extends BaseTransition {
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
@@ -997,9 +998,10 @@ public class RMAppAttemptImpl implements
           = (RMAppAttemptRegistrationEvent) event;
       appAttempt.host = registrationEvent.getHost();
       appAttempt.rpcPort = registrationEvent.getRpcport();
-      appAttempt.origTrackingUrl = registrationEvent.getTrackingurl();
+      appAttempt.origTrackingUrl =
+          sanitizeTrackingUrl(registrationEvent.getTrackingurl());
       appAttempt.proxiedTrackingUrl = 
-        appAttempt.generateProxyUriWithoutScheme(appAttempt.origTrackingUrl);
+        appAttempt.generateProxyUriWithScheme(appAttempt.origTrackingUrl);
 
       // Let the app know
       appAttempt.eventHandler.handle(new RMAppEvent(appAttempt
@@ -1132,9 +1134,10 @@ public class RMAppAttemptImpl implements
       RMAppAttemptUnregistrationEvent unregisterEvent
         = (RMAppAttemptUnregistrationEvent) event;
       appAttempt.diagnostics.append(unregisterEvent.getDiagnostics());
-      appAttempt.origTrackingUrl = unregisterEvent.getTrackingUrl();
+      appAttempt.origTrackingUrl =
+          sanitizeTrackingUrl(unregisterEvent.getTrackingUrl());
       appAttempt.proxiedTrackingUrl = 
-        appAttempt.generateProxyUriWithoutScheme(appAttempt.origTrackingUrl);
+        appAttempt.generateProxyUriWithScheme(appAttempt.origTrackingUrl);
       appAttempt.finalStatus = unregisterEvent.getFinalApplicationStatus();
 
       // Tell the app
@@ -1149,7 +1152,7 @@ public class RMAppAttemptImpl implements
       ApplicationId applicationId =
           appAttempt.getAppAttemptId().getApplicationId();
       appAttempt.eventHandler.handle(
-          new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_FINISHING));
+          new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_UNREGISTERED));
       return RMAppAttemptState.FINISHING;
     }
   }
@@ -1286,4 +1289,8 @@ public class RMAppAttemptImpl implements
     appAttempt.rmContext.getAMRMTokenSecretManager()
       .applicationMasterFinished(appAttempt.getAppAttemptId());
   }
+
+  private static String sanitizeTrackingUrl(String url) {
+    return (url == null || url.trim().isEmpty()) ? "N/A" : url;
+  }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java Thu Oct 17 05:32:42 2013
@@ -27,7 +27,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
-import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 
 /**
  * Node managers information on available resources 
@@ -84,7 +83,13 @@ public interface RMNode {
    * @return the time of the latest health report received from this node.
    */
   public long getLastHealthReportTime();
-  
+
+  /**
+   * the node manager version of the node received as part of the
+   * registration with the resource manager
+   */
+  public String getNodeManagerVersion();
+
   /**
    * the total available resource.
    * @return the total available resource.

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Thu Oct 17 05:32:42 2013
@@ -97,6 +97,7 @@ public class RMNodeImpl implements RMNod
 
   private String healthReport;
   private long lastHealthReportTime;
+  private String nodeManagerVersion;
 
   /* set of containers that have just launched */
   private final Map<ContainerId, ContainerStatus> justLaunchedContainers = 
@@ -172,7 +173,7 @@ public class RMNodeImpl implements RMNod
                              RMNodeEvent> stateMachine;
 
   public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
-      int cmPort, int httpPort, Node node, Resource capability) {
+      int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) {
     this.nodeId = nodeId;
     this.context = context;
     this.hostName = hostName;
@@ -184,6 +185,7 @@ public class RMNodeImpl implements RMNod
     this.node = node;
     this.healthReport = "Healthy";
     this.lastHealthReportTime = System.currentTimeMillis();
+    this.nodeManagerVersion = nodeManagerVersion;
 
     this.latestNodeHeartBeatResponse.setResponseId(0);
 
@@ -289,6 +291,11 @@ public class RMNodeImpl implements RMNod
   }
 
   @Override
+  public String getNodeManagerVersion() {
+    return nodeManagerVersion;
+  }
+
+  @Override
   public NodeState getState() {
     this.readLock.lock();
 
@@ -460,8 +467,11 @@ public class RMNodeImpl implements RMNod
           && rmNode.getHttpPort() == newNode.getHttpPort()) {
         // Reset heartbeat ID since node just restarted.
         rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
-        rmNode.context.getDispatcher().getEventHandler().handle(
-            new NodeAddedSchedulerEvent(rmNode));
+        if (rmNode.getState() != NodeState.UNHEALTHY) {
+          // Only add new node if old state is not UNHEALTHY
+          rmNode.context.getDispatcher().getEventHandler().handle(
+              new NodeAddedSchedulerEvent(rmNode));
+         }
       } else {
         // Reconnected node differs, so replace old node and start new node
         switch (rmNode.getState()) {

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Thu Oct 17 05:32:42 2013
@@ -116,14 +116,11 @@ public class AppSchedulingInfo {
    * The ApplicationMaster is updating resource requirements for the
    * application, by asking for more resources and releasing resources acquired
    * by the application.
-   * 
+   *
    * @param requests resources to be acquired
-   * @param blacklistAdditions resources to be added to the blacklist
-   * @param blacklistRemovals resources to be removed from the blacklist
    */
   synchronized public void updateResourceRequests(
-      List<ResourceRequest> requests,
-      List<String> blacklistAdditions, List<String> blacklistRemovals) {
+      List<ResourceRequest> requests) {
     QueueMetrics metrics = queue.getMetrics();
     
     // Update resource requests
@@ -181,11 +178,16 @@ public class AppSchedulingInfo {
                 lastRequestContainers)));
       }
     }
+  }
 
-    //
-    // Update blacklist
-    //
-    
+  /**
+   * The ApplicationMaster is updating the blacklist
+   *
+   * @param blacklistAdditions resources to be added to the blacklist
+   * @param blacklistRemovals resources to be removed from the blacklist
+   */
+  synchronized public void updateBlacklist(
+      List<String> blacklistAdditions, List<String> blacklistRemovals) {
     // Add to blacklist
     if (blacklistAdditions != null) {
       blacklist.addAll(blacklistAdditions);

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java Thu Oct 17 05:32:42 2013
@@ -64,4 +64,6 @@ public interface Queue {
    * @return queue ACLs for user
    */
   List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user);
+
+  boolean hasAccess(QueueACL acl, UserGroupInformation user);
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java Thu Oct 17 05:32:42 2013
@@ -73,7 +73,7 @@ public class QueueMetrics implements Met
   @Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores;
   @Metric("# of reserved containers") MutableGaugeInt reservedContainers;
   @Metric("# of active users") MutableGaugeInt activeUsers;
-  @Metric("# of active users") MutableGaugeInt activeApplications;
+  @Metric("# of active applications") MutableGaugeInt activeApplications;
   private final MutableGaugeInt[] runningTime;
   private TimeBucketMetrics<ApplicationId> runBuckets;
 

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Thu Oct 17 05:32:42 2013
@@ -25,9 +25,11 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -134,4 +136,17 @@ public interface YarnScheduler extends E
   @LimitedPrivate("yarn")
   @Evolving
   QueueMetrics getRootQueueMetrics();
+
+  /**
+   * Check if the user has permission to perform the operation.
+   * If the user has {@link QueueACL#ADMINISTER_QUEUE} permission,
+   * this user can view/modify the applications in this queue
+   * @param callerUGI
+   * @param acl
+   * @param queueName
+   * @return <code>true</code> if the user has the permission,
+   *         <code>false</code> otherwise
+   */
+  boolean checkAccess(UserGroupInformation callerUGI,
+      QueueACL acl, String queueName);
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java Thu Oct 17 05:32:42 2013
@@ -99,15 +99,11 @@ class CSQueueUtils {
           Resources.divide(calculator, clusterResource, 
               usedResources, queueLimit);
     }
-    
+
     childQueue.setUsedCapacity(usedCapacity);
     childQueue.setAbsoluteUsedCapacity(absoluteUsedCapacity);
     
-    Resource available = 
-        Resources.roundUp(
-            calculator, 
-            Resources.subtract(queueLimit, usedResources), 
-            minimumAllocation);
+    Resource available = Resources.subtract(queueLimit, usedResources);
     childQueue.getMetrics().setAvailableResourcesToQueue(
         Resources.max(
             calculator, 

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Thu Oct 17 05:32:42 2013
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -571,8 +572,7 @@ public class CapacityScheduler
         application.showRequests();
   
         // Update application requests
-        application.updateResourceRequests(ask, 
-            blacklistAdditions, blacklistRemovals);
+        application.updateResourceRequests(ask);
   
         LOG.debug("allocate: post-update");
         application.showRequests();
@@ -584,6 +584,8 @@ public class CapacityScheduler
           " #ask=" + ask.size());
       }
 
+      application.updateBlacklist(blacklistAdditions, blacklistRemovals);
+
       return application.getAllocation(getResourceCalculator(),
                    clusterResource, getMinimumResourceCapability());
     }
@@ -913,4 +915,18 @@ public class CapacityScheduler
         RMContainerEventType.KILL);
   }
 
+  @Override
+  public synchronized boolean checkAccess(UserGroupInformation callerUGI,
+      QueueACL acl, String queueName) {
+    CSQueue queue = getQueue(queueName);
+    if (queue == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("ACL not found for queue access-type " + acl
+            + " for queue " + queueName);
+      }
+      return false;
+    }
+    return queue.hasAccess(acl, callerUGI);
+  }
+
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Thu Oct 17 05:32:42 2013
@@ -644,7 +644,8 @@ public class LeafQueue implements CSQueu
 
     // Check queue ACLs
     UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(userName);
-    if (!hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)) {
+    if (!hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
+        && !hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
       throw new AccessControlException("User " + userName + " cannot submit" +
           " applications to queue " + getQueuePath());
     }
@@ -1609,9 +1610,12 @@ public class LeafQueue implements CSQueu
 
   }
 
-  // need to access the list of apps from the preemption monitor
+  /**
+   * Obtain (read-only) collection of active applications.
+   */
   public Set<FiCaSchedulerApp> getApplications() {
-    return Collections.unmodifiableSet(activeApplications);
+    // need to access the list of apps from the preemption monitor
+    return activeApplications;
   }
 
   // return a single Resource capturing the overal amount of pending resources

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Thu Oct 17 05:32:42 2013
@@ -141,10 +141,16 @@ public class FiCaSchedulerApp extends Sc
   }
 
   public synchronized void updateResourceRequests(
-      List<ResourceRequest> requests, 
+      List<ResourceRequest> requests) {
+    if (!isStopped) {
+      this.appSchedulingInfo.updateResourceRequests(requests);
+    }
+  }
+
+  public synchronized void updateBlacklist(
       List<String> blacklistAdditions, List<String> blacklistRemovals) {
     if (!isStopped) {
-      this.appSchedulingInfo.updateResourceRequests(requests, 
+      this.appSchedulingInfo.updateBlacklist(
           blacklistAdditions, blacklistRemovals);
     }
   }

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Thu Oct 17 05:32:42 2013
@@ -192,10 +192,6 @@ public class AppSchedulable extends Sche
       RMContainer rmContainer = app.reserve(node, priority, null,
           container);
       node.reserveResource(app, priority, rmContainer);
-      getMetrics().reserveResource(app.getUser(),
-          container.getResource());
-      scheduler.getRootQueueMetrics().reserveResource(app.getUser(),
-          container.getResource());
     }
 
     else {
@@ -216,8 +212,6 @@ public class AppSchedulable extends Sche
     node.unreserveResource(app);
     getMetrics().unreserveResource(
         app.getUser(), rmContainer.getContainer().getResource());
-    scheduler.getRootQueueMetrics().unreserveResource(
-        app.getUser(), rmContainer.getContainer().getResource());
   }
 
   /**
@@ -274,7 +268,9 @@ public class AppSchedulable extends Sche
   }
 
   private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
-    LOG.info("Node offered to app: " + getName() + " reserved: " + reserved);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved);
+    }
 
     if (reserved) {
       RMContainer rmContainer = node.getReservedContainer();
@@ -316,10 +312,19 @@ public class AppSchedulable extends Sche
               + localRequest);
         }
         
-        NodeType allowedLocality = app.getAllowedLocalityLevel(priority,
-            scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(),
-            scheduler.getRackLocalityThreshold());
-        
+        NodeType allowedLocality;
+        if (scheduler.isContinuousSchedulingEnabled()) {
+          allowedLocality = app.getAllowedLocalityLevelByTime(priority,
+                  scheduler.getNodeLocalityDelayMs(),
+                  scheduler.getRackLocalityDelayMs(),
+                  scheduler.getClock().getTime());
+        } else {
+          allowedLocality = app.getAllowedLocalityLevel(priority,
+                  scheduler.getNumClusterNodes(),
+                  scheduler.getNodeLocalityThreshold(),
+                  scheduler.getRackLocalityThreshold());
+        }
+
         if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
             && localRequest != null && localRequest.getNumContainers() != 0) {
           return assignContainer(node, priority,

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Thu Oct 17 05:32:42 2013
@@ -146,7 +146,7 @@ public class FSLeafQueue extends FSQueue
   public Resource assignContainer(FSSchedulerNode node) {
     Resource assigned = Resources.none();
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Node offered to queue: " + getName());
+      LOG.debug("Node " + node.getNodeName() + " offered to queue: " + getName());
     }
 
     if (!assignContainerPreCheck(node)) {

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java Thu Oct 17 05:32:42 2013
@@ -21,12 +21,14 @@ package org.apache.hadoop.yarn.server.re
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 
+@Metrics(context="yarn")
 public class FSQueueMetrics extends QueueMetrics {
 
   @Metric("Fair share of memory in MB") MutableGaugeInt fairShareMB;

Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Thu Oct 17 05:32:42 2013
@@ -138,7 +138,7 @@ public class FSSchedulerApp extends Sche
 
   public synchronized void updateResourceRequests(
       List<ResourceRequest> requests) {
-    this.appSchedulingInfo.updateResourceRequests(requests, null, null);
+    this.appSchedulingInfo.updateResourceRequests(requests);
   }
 
   public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
@@ -464,7 +464,12 @@ public class FSSchedulerApp extends Sche
    * @param priority The priority of the container scheduled.
    */
   synchronized public void resetSchedulingOpportunities(Priority priority) {
-    lastScheduledContainer.put(priority, System.currentTimeMillis());
+    resetSchedulingOpportunities(priority, System.currentTimeMillis());
+  }
+  // used for continuous scheduling
+  synchronized public void resetSchedulingOpportunities(Priority priority,
+                                                        long currentTimeMs) {
+    lastScheduledContainer.put(priority, currentTimeMs);
     schedulingOpportunities.setCount(priority, 0);
   }
 
@@ -513,6 +518,55 @@ public class FSSchedulerApp extends Sche
     return allowedLocalityLevel.get(priority);
   }
 
+  /**
+   * Return the level at which we are allowed to schedule containers.
+   * Given the thresholds indicating how much time passed before relaxing
+   * scheduling constraints.
+   */
+  public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority,
+          long nodeLocalityDelayMs, long rackLocalityDelayMs,
+          long currentTimeMs) {
+
+    // if not being used, can schedule anywhere
+    if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) {
+      return NodeType.OFF_SWITCH;
+    }
+
+    // default level is NODE_LOCAL
+    if (! allowedLocalityLevel.containsKey(priority)) {
+      allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
+      return NodeType.NODE_LOCAL;
+    }
+
+    NodeType allowed = allowedLocalityLevel.get(priority);
+
+    // if level is already most liberal, we're done
+    if (allowed.equals(NodeType.OFF_SWITCH)) {
+      return NodeType.OFF_SWITCH;
+    }
+
+    // check waiting time
+    long waitTime = currentTimeMs;
+    if (lastScheduledContainer.containsKey(priority)) {
+      waitTime -= lastScheduledContainer.get(priority);
+    } else {
+      waitTime -= appSchedulable.getStartTime();
+    }
+
+    long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
+            nodeLocalityDelayMs : rackLocalityDelayMs;
+
+    if (waitTime > thresholdTime) {
+      if (allowed.equals(NodeType.NODE_LOCAL)) {
+        allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
+        resetSchedulingOpportunities(priority, currentTimeMs);
+      } else if (allowed.equals(NodeType.RACK_LOCAL)) {
+        allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
+        resetSchedulingOpportunities(priority, currentTimeMs);
+      }
+    }
+    return allowedLocalityLevel.get(priority);
+  }
 
   synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
       Priority priority, ResourceRequest request,



Mime
View raw message