hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject [06/19] hadoop git commit: YARN-2883. Queuing of container requests in the NM. (Konstantinos Karanasos and Arun Suresh via kasha)
Date Thu, 19 May 2016 06:37:34 GMT
YARN-2883. Queuing of container requests in the NM. (Konstantinos Karanasos and Arun Suresh via kasha)

(cherry picked from commit c8172f5f143d2fefafa5a412899ab7cd081b406d)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b56fc51b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b56fc51b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b56fc51b

Branch: refs/heads/branch-2
Commit: b56fc51b705ed92823fe876b28f199020a093e44
Parents: 68964bb
Author: Karthik Kambatla <kasha@apache.org>
Authored: Wed Apr 20 09:50:37 2016 -0700
Committer: Arun Suresh <asuresh@apache.org>
Committed: Wed May 18 22:08:20 2016 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/api/records/ContainerState.java |   5 +-
 .../hadoop/yarn/conf/YarnConfiguration.java     |   5 +
 .../src/main/proto/yarn_protos.proto            |   1 +
 .../src/main/resources/yarn-default.xml         |   7 +
 .../hadoop/yarn/server/utils/BuilderUtils.java  |   9 +
 .../hadoop/yarn/server/nodemanager/Context.java |  17 +
 .../yarn/server/nodemanager/NodeManager.java    |  42 +-
 .../containermanager/ContainerManagerImpl.java  | 113 ++--
 .../container/ContainerImpl.java                |   3 +-
 .../monitor/ContainersMonitor.java              |  18 +
 .../monitor/ContainersMonitorImpl.java          | 160 ++++--
 .../queuing/QueuingContainerManagerImpl.java    | 556 +++++++++++++++++++
 .../containermanager/queuing/package-info.java  |  23 +
 .../nodemanager/TestNodeStatusUpdater.java      |   1 -
 .../amrmproxy/BaseAMRMProxyTest.java            |   7 +-
 .../BaseContainerManagerTest.java               |  56 +-
 .../containermanager/TestContainerManager.java  |  78 +--
 .../monitor/MockResourceCalculatorPlugin.java   |   4 +
 .../TestContainersMonitorResourceChange.java    |   9 +-
 .../queuing/TestQueuingContainerManager.java    | 301 ++++++++++
 20 files changed, 1271 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b56fc51b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
index 323d31d..582389f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
@@ -34,5 +34,8 @@ public enum ContainerState {
   RUNNING, 
   
   /** Completed container */
-  COMPLETE
+  COMPLETE,
+
+  /** Queued at the NM. */
+  QUEUED
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b56fc51b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 52355ab..34e8c1b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -675,6 +675,11 @@ public class YarnConfiguration extends Configuration {
   /** Prefix for all node manager configs.*/
   public static final String NM_PREFIX = "yarn.nodemanager.";
 
+  /** Enable Queuing of <code>OPPORTUNISTIC</code> containers. */
+  public static final String NM_CONTAINER_QUEUING_ENABLED = NM_PREFIX
+      + "container-queuing-enabled";
+  public static final boolean NM_CONTAINER_QUEUING_ENABLED_DEFAULT = false;
+
   /** Environment variables that will be sent to containers.*/
   public static final String NM_ADMIN_USER_ENV = NM_PREFIX + "admin-env";
   public static final String DEFAULT_NM_ADMIN_USER_ENV = "MALLOC_ARENA_MAX=$MALLOC_ARENA_MAX";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b56fc51b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index e5d31dc..3090279 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -82,6 +82,7 @@ enum ContainerStateProto {
   C_NEW = 1;
   C_RUNNING = 2;
   C_COMPLETE = 3;
+  C_QUEUED = 4;
 }
 
 message ContainerProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b56fc51b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 9cae5fb..a38d0d8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -973,6 +973,13 @@
   </property>
 
   <property>
+    <description>Enable Queuing of OPPORTUNISTIC containers on the
+      nodemanager.</description>
+    <name>yarn.nodemanager.container-queuing-enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
     <description>
       Number of seconds after an application finishes before the nodemanager's 
       DeletionService will delete the application's localized file directory

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b56fc51b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index 4fdd43c..a70d143 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -215,6 +216,13 @@ public class BuilderUtils {
   public static ContainerStatus newContainerStatus(ContainerId containerId,
       ContainerState containerState, String diagnostics, int exitStatus,
       Resource capability) {
+    return newContainerStatus(containerId, containerState, diagnostics,
+        exitStatus, capability, ExecutionType.GUARANTEED);
+  }
+
+  public static ContainerStatus newContainerStatus(ContainerId containerId,
+      ContainerState containerState, String diagnostics, int exitStatus,
+      Resource capability, ExecutionType executionType) {
     ContainerStatus containerStatus = recordFactory
       .newRecordInstance(ContainerStatus.class);
     containerStatus.setState(containerState);
@@ -222,6 +230,7 @@ public class BuilderUtils {
     containerStatus.setDiagnostics(diagnostics);
     containerStatus.setExitStatus(exitStatus);
     containerStatus.setCapability(capability);
+    containerStatus.setExecutionType(executionType);
     return containerStatus;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b56fc51b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index d3251ae..205e475 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -43,6 +44,15 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 public interface Context {
 
   /**
+   * Interface exposing methods related to the queuing of containers in the NM.
+   */
+  interface QueuingContext {
+    ConcurrentMap<ContainerId, ContainerTokenIdentifier> getQueuedContainers();
+
+    ConcurrentMap<ContainerTokenIdentifier, String> getKilledQueuedContainers();
+  }
+
+  /**
    * Return the nodeId. Usable only when the ContainerManager is started.
    * 
    * @return the NodeId
@@ -89,4 +99,11 @@ public interface Context {
       getLogAggregationStatusForApps();
 
   NodeStatusUpdater getNodeStatusUpdater();
+
+  /**
+   * Returns a <code>QueuingContext</code> that provides information about the
+   * number of Containers Queued as well as the number of Containers that were
+   * queued and killed.
+   */
+  QueuingContext getQueuingContext();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b56fc51b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 7c104d5..e4036ea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -57,11 +57,13 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
 import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
@@ -170,8 +172,14 @@ public class NodeManager extends CompositeService
       ContainerExecutor exec, DeletionService del,
       NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
       LocalDirsHandlerService dirsHandler) {
-    return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
-      metrics, dirsHandler);
+    if (getConfig().getBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED,
+        YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED_DEFAULT)) {
+      return new QueuingContainerManagerImpl(context, exec, del,
+          nodeStatusUpdater, metrics, dirsHandler);
+    } else {
+      return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
+          metrics, dirsHandler);
+    }
   }
 
   protected WebServer createWebServer(Context nmContext,
@@ -461,6 +469,8 @@ public class NodeManager extends CompositeService
         logAggregationReportForApps;
     private NodeStatusUpdater nodeStatusUpdater;
 
+    private final QueuingContext queuingContext;
+
     public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
         NMTokenSecretManagerInNM nmTokenSecretManager,
         LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
@@ -475,6 +485,7 @@ public class NodeManager extends CompositeService
       this.stateStore = stateStore;
       this.logAggregationReportForApps = new ConcurrentLinkedQueue<
           LogAggregationReport>();
+      this.queuingContext = new QueuingNMContext();
     }
 
     /**
@@ -595,8 +606,35 @@ public class NodeManager extends CompositeService
     public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) {
       this.nodeStatusUpdater = nodeStatusUpdater;
     }
+
+    @Override
+    public QueuingContext getQueuingContext() {
+      return this.queuingContext;
+    }
   }
 
+  /**
+   * Class that keeps the context for containers queued at the NM.
+   */
+  public static class QueuingNMContext implements Context.QueuingContext {
+    protected final ConcurrentMap<ContainerId, ContainerTokenIdentifier>
+        queuedContainers = new ConcurrentSkipListMap<>();
+
+    protected final ConcurrentMap<ContainerTokenIdentifier, String>
+        killedQueuedContainers = new ConcurrentHashMap<>();
+
+    @Override
+    public ConcurrentMap<ContainerId, ContainerTokenIdentifier>
+        getQueuedContainers() {
+      return this.queuedContainers;
+    }
+
+    @Override
+    public ConcurrentMap<ContainerTokenIdentifier, String>
+        getKilledQueuedContainers() {
+      return this.killedQueuedContainers;
+    }
+  }
 
   /**
    * @return the node health checker

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b56fc51b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index bb9b3ee..162823c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -160,11 +160,11 @@ public class ContainerManagerImpl extends CompositeService implements
 
   private static final Log LOG = LogFactory.getLog(ContainerManagerImpl.class);
 
-  static final String INVALID_NMTOKEN_MSG = "Invalid NMToken";
+  public static final String INVALID_NMTOKEN_MSG = "Invalid NMToken";
   static final String INVALID_CONTAINERTOKEN_MSG =
       "Invalid ContainerToken";
 
-  final Context context;
+  protected final Context context;
   private final ContainersMonitor containersMonitor;
   private Server server;
   private final ResourceLocalizationService rsrcLocalizationSrvc;
@@ -172,7 +172,7 @@ public class ContainerManagerImpl extends CompositeService implements
   private final AuxServices auxiliaryServices;
   private final NodeManagerMetrics metrics;
 
-  private final NodeStatusUpdater nodeStatusUpdater;
+  protected final NodeStatusUpdater nodeStatusUpdater;
 
   protected LocalDirsHandlerService dirsHandler;
   protected final AsyncDispatcher dispatcher;
@@ -213,14 +213,13 @@ public class ContainerManagerImpl extends CompositeService implements
     auxiliaryServices.registerServiceListener(this);
     addService(auxiliaryServices);
 
-    this.containersMonitor =
-        new ContainersMonitorImpl(exec, dispatcher, this.context);
+    this.containersMonitor = createContainersMonitor(exec);
     addService(this.containersMonitor);
 
     dispatcher.register(ContainerEventType.class,
         new ContainerEventDispatcher());
     dispatcher.register(ApplicationEventType.class,
-        new ApplicationEventDispatcher());
+        createApplicationEventDispatcher());
     dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);
     dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
     dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
@@ -235,6 +234,7 @@ public class ContainerManagerImpl extends CompositeService implements
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
+
     LogHandler logHandler =
       createLogHandler(conf, this.context, this.deletionService);
     addIfService(logHandler);
@@ -276,6 +276,10 @@ public class ContainerManagerImpl extends CompositeService implements
     }
   }
 
+  protected ContainersMonitor createContainersMonitor(ContainerExecutor exec) {
+    return new ContainersMonitorImpl(exec, dispatcher, this.context);
+  }
+
   @SuppressWarnings("unchecked")
   private void recover() throws IOException, URISyntaxException {
     NMStateStoreService stateStore = context.getNMStateStore();
@@ -417,6 +421,10 @@ public class ContainerManagerImpl extends CompositeService implements
     return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this);
   }
 
+  protected EventHandler<ApplicationEvent> createApplicationEventDispatcher() {
+    return new ApplicationEventDispatcher();
+  }
+
   @Override
   protected void serviceStart() throws Exception {
 
@@ -801,7 +809,8 @@ public class ContainerManagerImpl extends CompositeService implements
               .equals(ContainerType.APPLICATION_MASTER)) {
             this.getAMRMProxyService().processApplicationStartRequest(request);
           }
-
+          performContainerPreStartChecks(nmTokenIdentifier, request,
+              containerTokenIdentifier);
           startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
               request);
           succeededContainers.add(containerId);
@@ -821,6 +830,42 @@ public class ContainerManagerImpl extends CompositeService implements
     }
   }
 
+  private void performContainerPreStartChecks(
+      NMTokenIdentifier nmTokenIdentifier, StartContainerRequest request,
+      ContainerTokenIdentifier containerTokenIdentifier)
+      throws YarnException, InvalidToken {
+  /*
+   * 1) It should save the NMToken into NMTokenSecretManager. This is done
+   * here instead of RPC layer because at the time of opening/authenticating
+   * the connection it doesn't know what all RPC calls user will make on it.
+   * Also new NMToken is issued only at startContainer (once it gets
+   * renewed).
+   *
+   * 2) It should validate containerToken. Need to check below things. a) It
+   * is signed by correct master key (part of retrieve password). b) It
+   * belongs to correct Node Manager (part of retrieve password). c) It has
+   * correct RMIdentifier. d) It is not expired.
+   */
+    authorizeStartAndResourceIncreaseRequest(
+        nmTokenIdentifier, containerTokenIdentifier, true);
+    // update NMToken
+    updateNMTokenIdentifier(nmTokenIdentifier);
+
+    ContainerLaunchContext launchContext = request.getContainerLaunchContext();
+
+    Map<String, ByteBuffer> serviceData = getAuxServiceMetaData();
+    if (launchContext.getServiceData()!=null &&
+        !launchContext.getServiceData().isEmpty()) {
+      for (Entry<String, ByteBuffer> meta : launchContext.getServiceData()
+          .entrySet()) {
+        if (null == serviceData.get(meta.getKey())) {
+          throw new InvalidAuxServiceException("The auxService:" + meta.getKey()
+              + " does not exist");
+        }
+      }
+    }
+  }
+
   private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
       String user, Credentials credentials,
       Map<ApplicationAccessType, String> appAcls,
@@ -863,26 +908,10 @@ public class ContainerManagerImpl extends CompositeService implements
   }
 
   @SuppressWarnings("unchecked")
-  private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
+  protected void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
       ContainerTokenIdentifier containerTokenIdentifier,
       StartContainerRequest request) throws YarnException, IOException {
 
-    /*
-     * 1) It should save the NMToken into NMTokenSecretManager. This is done
-     * here instead of RPC layer because at the time of opening/authenticating
-     * the connection it doesn't know what all RPC calls user will make on it.
-     * Also new NMToken is issued only at startContainer (once it gets renewed).
-     * 
-     * 2) It should validate containerToken. Need to check below things. a) It
-     * is signed by correct master key (part of retrieve password). b) It
-     * belongs to correct Node Manager (part of retrieve password). c) It has
-     * correct RMIdentifier. d) It is not expired.
-     */
-    authorizeStartAndResourceIncreaseRequest(
-        nmTokenIdentifier, containerTokenIdentifier, true);
-    // update NMToken
-    updateNMTokenIdentifier(nmTokenIdentifier);
-
     ContainerId containerId = containerTokenIdentifier.getContainerID();
     String containerIdStr = containerId.toString();
     String user = containerTokenIdentifier.getApplicationSubmitter();
@@ -891,18 +920,6 @@ public class ContainerManagerImpl extends CompositeService implements
 
     ContainerLaunchContext launchContext = request.getContainerLaunchContext();
 
-    Map<String, ByteBuffer> serviceData = getAuxServiceMetaData();
-    if (launchContext.getServiceData()!=null && 
-        !launchContext.getServiceData().isEmpty()) {
-      for (Map.Entry<String, ByteBuffer> meta : launchContext.getServiceData()
-          .entrySet()) {
-        if (null == serviceData.get(meta.getKey())) {
-          throw new InvalidAuxServiceException("The auxService:" + meta.getKey()
-              + " does not exist");
-        }
-      }
-    }
-
     Credentials credentials =
         YarnServerSecurityUtils.parseCredentials(launchContext);
 
@@ -922,13 +939,14 @@ public class ContainerManagerImpl extends CompositeService implements
 
     this.readLock.lock();
     try {
-      if (!serviceStopped) {
+      if (!isServiceStopped()) {
         // Create the application
-        Application application =
-            new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
+        Application application = new ApplicationImpl(dispatcher, user,
+            applicationID, credentials, context);
         if (null == context.getApplications().putIfAbsent(applicationID,
           application)) {
-          LOG.info("Creating a new application reference for app " + applicationID);
+          LOG.info("Creating a new application reference for app "
+              + applicationID);
           LogAggregationContext logAggregationContext =
               containerTokenIdentifier.getLogAggregationContext();
           Map<ApplicationAccessType, String> appAcls =
@@ -1146,7 +1164,9 @@ public class ContainerManagerImpl extends CompositeService implements
     }
     for (ContainerId id : requests.getContainerIds()) {
       try {
-        stopContainerInternal(identifier, id);
+        Container container = this.context.getContainers().get(id);
+        authorizeGetAndStopContainerRequest(id, container, true, identifier);
+        stopContainerInternal(id);
         succeededRequests.add(id);
       } catch (YarnException e) {
         failedRequests.put(id, SerializedException.newInstance(e));
@@ -1157,13 +1177,11 @@ public class ContainerManagerImpl extends CompositeService implements
   }
 
   @SuppressWarnings("unchecked")
-  private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier,
-      ContainerId containerID) throws YarnException, IOException {
+  protected void stopContainerInternal(ContainerId containerID)
+      throws YarnException, IOException {
     String containerIDStr = containerID.toString();
     Container container = this.context.getContainers().get(containerID);
     LOG.info("Stopping container with container Id: " + containerIDStr);
-    authorizeGetAndStopContainerRequest(containerID, container, true,
-      nmTokenIdentifier);
 
     if (container == null) {
       if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) {
@@ -1210,7 +1228,7 @@ public class ContainerManagerImpl extends CompositeService implements
       failedRequests);
   }
 
-  private ContainerStatus getContainerStatusInternal(ContainerId containerID,
+  protected ContainerStatus getContainerStatusInternal(ContainerId containerID,
       NMTokenIdentifier nmTokenIdentifier) throws YarnException {
     String containerIDStr = containerID.toString();
     Container container = this.context.getContainers().get(containerID);
@@ -1406,4 +1424,7 @@ public class ContainerManagerImpl extends CompositeService implements
     this.amrmProxyService = amrmProxyService;
   }
 
+  protected boolean isServiceStopped() {
+    return serviceStopped;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b56fc51b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index a493562..b1ddc2e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -491,7 +491,8 @@ public class ContainerImpl implements Container {
     this.readLock.lock();
     try {
       return BuilderUtils.newContainerStatus(this.containerId,
-        getCurrentState(), diagnostics.toString(), exitCode, getResource());
+          getCurrentState(), diagnostics.toString(), exitCode, getResource(),
+          this.containerTokenIdentifier.getExecutionType());
     } finally {
       this.readLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b56fc51b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
index 4d69dbf..1069b4f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
@@ -22,8 +22,26 @@ import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
 
 public interface ContainersMonitor extends Service,
     EventHandler<ContainersMonitorEvent>, ResourceView {
   public ResourceUtilization getContainersUtilization();
+
+  ResourceUtilization getContainersAllocation();
+
+  boolean hasResourcesAvailable(ProcessTreeInfo pti);
+
+  void increaseContainersAllocation(ProcessTreeInfo pti);
+
+  void decreaseContainersAllocation(ProcessTreeInfo pti);
+
+  void increaseResourceUtilization(ResourceUtilization resourceUtil,
+      ProcessTreeInfo pti);
+
+  void decreaseResourceUtilization(ResourceUtilization resourceUtil,
+      ProcessTreeInfo pti);
+
+  void subtractNodeResourcesFromResourceUtilization(
+      ResourceUtilization resourceUtil);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b56fc51b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 446e7a1..0feac3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -63,7 +63,7 @@ public class ContainersMonitorImpl extends AbstractService implements
 
   private final ContainerExecutor containerExecutor;
   private final Dispatcher eventDispatcher;
-  private final Context context;
+  protected final Context context;
   private ResourceCalculatorPlugin resourceCalculatorPlugin;
   private Configuration conf;
   private static float vmemRatio;
@@ -82,6 +82,9 @@ public class ContainersMonitorImpl extends AbstractService implements
   private int nodeCpuPercentageForYARN;
 
   private ResourceUtilization containersUtilization;
+  // Tracks the aggregated allocation of the currently allocated containers
+  // when queuing of containers at the NMs is enabled.
+  private ResourceUtilization containersAllocation;
 
   private volatile boolean stopped = false;
 
@@ -96,6 +99,7 @@ public class ContainersMonitorImpl extends AbstractService implements
     this.monitoringThread = new MonitoringThread();
 
     this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
+    this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f);
   }
 
   @Override
@@ -132,10 +136,11 @@ public class ContainersMonitorImpl extends AbstractService implements
         YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS);
 
     long configuredPMemForContainers =
-        NodeManagerHardwareUtils.getContainerMemoryMB(conf) * 1024 * 1024L;
+        NodeManagerHardwareUtils.getContainerMemoryMB(
+            this.resourceCalculatorPlugin, conf) * 1024 * 1024L;
 
     long configuredVCoresForContainers =
-        NodeManagerHardwareUtils.getVCores(conf);
+        NodeManagerHardwareUtils.getVCores(this.resourceCalculatorPlugin, conf);
 
     // Setting these irrespective of whether checks are enabled. Required in
     // the UI.
@@ -233,8 +238,7 @@ public class ContainersMonitorImpl extends AbstractService implements
     super.serviceStop();
   }
 
-  @VisibleForTesting
-  static class ProcessTreeInfo {
+  public static class ProcessTreeInfo {
     private ContainerId containerId;
     private String pid;
     private ResourceCalculatorProcessTree pTree;
@@ -697,6 +701,82 @@ public class ContainersMonitorImpl extends AbstractService implements
     this.containersUtilization = utilization;
   }
 
+  public ResourceUtilization getContainersAllocation() {
+    return this.containersAllocation;
+  }
+
+  /**
+   * @return true if there are available allocated resources for the given
+   *         container to start.
+   */
+  @Override
+  public boolean hasResourcesAvailable(ProcessTreeInfo pti) {
+    synchronized (this.containersAllocation) {
+      // Check physical memory.
+      if (this.containersAllocation.getPhysicalMemory() +
+          (int) (pti.getPmemLimit() >> 20) >
+          (int) (getPmemAllocatedForContainers() >> 20)) {
+        return false;
+      }
+      // Check virtual memory.
+      if (isVmemCheckEnabled() &&
+          this.containersAllocation.getVirtualMemory() +
+          (int) (pti.getVmemLimit() >> 20) >
+          (int) (getVmemAllocatedForContainers() >> 20)) {
+        return false;
+      }
+      // Check CPU.
+      if (this.containersAllocation.getCPU()
+          + allocatedCpuUsage(pti) > 1.0f) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void increaseContainersAllocation(ProcessTreeInfo pti) {
+    synchronized (this.containersAllocation) {
+      increaseResourceUtilization(this.containersAllocation, pti);
+    }
+  }
+
+  @Override
+  public void decreaseContainersAllocation(ProcessTreeInfo pti) {
+    synchronized (this.containersAllocation) {
+      decreaseResourceUtilization(this.containersAllocation, pti);
+    }
+  }
+
+  @Override
+  public void increaseResourceUtilization(ResourceUtilization resourceUtil,
+      ProcessTreeInfo pti) {
+    resourceUtil.addTo((int) (pti.getPmemLimit() >> 20),
+        (int) (pti.getVmemLimit() >> 20), allocatedCpuUsage(pti));
+  }
+
+  @Override
+  public void decreaseResourceUtilization(ResourceUtilization resourceUtil,
+      ProcessTreeInfo pti) {
+    resourceUtil.subtractFrom((int) (pti.getPmemLimit() >> 20),
+        (int) (pti.getVmemLimit() >> 20), allocatedCpuUsage(pti));
+  }
+
+  @Override
+  public void subtractNodeResourcesFromResourceUtilization(
+      ResourceUtilization resourceUtil) {
+    resourceUtil.subtractFrom((int) (getPmemAllocatedForContainers() >> 20),
+        (int) (getVmemAllocatedForContainers() >> 20), 1.0f);
+  }
+
+  private float allocatedCpuUsage(ProcessTreeInfo pti) {
+    float cpuUsagePercentPerCore = pti.getCpuVcores() * 100.0f;
+    float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore
+        / resourceCalculatorPlugin.getNumProcessors();
+    return (cpuUsageTotalCoresPercentage * 1000 *
+        maxVCoresAllottedForContainers / nodeCpuPercentageForYARN) / 1000.0f;
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public void handle(ContainersMonitorEvent monitoringEvent) {
@@ -714,40 +794,56 @@ public class ContainersMonitorImpl extends AbstractService implements
 
     switch (monitoringEvent.getType()) {
     case START_MONITORING_CONTAINER:
-      ContainerStartMonitoringEvent startEvent =
-          (ContainerStartMonitoringEvent) monitoringEvent;
-      LOG.info("Starting resource-monitoring for " + containerId);
-      updateContainerMetrics(monitoringEvent);
-      trackingContainers.put(containerId,
-          new ProcessTreeInfo(containerId, null, null,
-              startEvent.getVmemLimit(), startEvent.getPmemLimit(),
-              startEvent.getCpuVcores()));
+      onStartMonitoringContainer(monitoringEvent, containerId);
       break;
     case STOP_MONITORING_CONTAINER:
-      LOG.info("Stopping resource-monitoring for " + containerId);
-      updateContainerMetrics(monitoringEvent);
-      trackingContainers.remove(containerId);
+      onStopMonitoringContainer(monitoringEvent, containerId);
       break;
     case CHANGE_MONITORING_CONTAINER_RESOURCE:
-      ChangeMonitoringContainerResourceEvent changeEvent =
-          (ChangeMonitoringContainerResourceEvent) monitoringEvent;
-      ProcessTreeInfo processTreeInfo = trackingContainers.get(containerId);
-      if (processTreeInfo == null) {
-        LOG.warn("Failed to track container "
-            + containerId.toString()
-            + ". It may have already completed.");
-        break;
-      }
-      LOG.info("Changing resource-monitoring for " + containerId);
-      updateContainerMetrics(monitoringEvent);
-      long pmemLimit = changeEvent.getResource().getMemory() * 1024L * 1024L;
-      long vmemLimit = (long) (pmemLimit * vmemRatio);
-      int cpuVcores = changeEvent.getResource().getVirtualCores();
-      processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
-      changeContainerResource(containerId, changeEvent.getResource());
+      onChangeMonitoringContainerResource(monitoringEvent, containerId);
       break;
     default:
       // TODO: Wrong event.
     }
   }
+
+  protected void onChangeMonitoringContainerResource(
+      ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
+    ChangeMonitoringContainerResourceEvent changeEvent =
+        (ChangeMonitoringContainerResourceEvent) monitoringEvent;
+    ProcessTreeInfo processTreeInfo = trackingContainers.get(containerId);
+    if (processTreeInfo == null) {
+      LOG.warn("Failed to track container "
+          + containerId.toString()
+          + ". It may have already completed.");
+      return;
+    }
+    LOG.info("Changing resource-monitoring for " + containerId);
+    updateContainerMetrics(monitoringEvent);
+    long pmemLimit = changeEvent.getResource().getMemory() * 1024L * 1024L;
+    long vmemLimit = (long) (pmemLimit * vmemRatio);
+    int cpuVcores = changeEvent.getResource().getVirtualCores();
+    processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
+    changeContainerResource(containerId, changeEvent.getResource());
+  }
+
+  protected void onStopMonitoringContainer(
+      ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
+    LOG.info("Stopping resource-monitoring for " + containerId);
+    updateContainerMetrics(monitoringEvent);
+    trackingContainers.remove(containerId);
+  }
+
+  protected void onStartMonitoringContainer(
+      ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
+    ContainerStartMonitoringEvent startEvent =
+        (ContainerStartMonitoringEvent) monitoringEvent;
+    LOG.info("Starting resource-monitoring for " + containerId);
+    updateContainerMetrics(monitoringEvent);
+    trackingContainers.put(containerId,
+        new ProcessTreeInfo(containerId, null, null,
+            startEvent.getVmemLimit(), startEvent.getPmemLimit(),
+            startEvent.getCpuVcores()));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b56fc51b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
new file mode 100644
index 0000000..ef4e571
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
@@ -0,0 +1,556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Class extending {@link ContainerManagerImpl} and is used when queuing at the
+ * NM is enabled.
+ */
+public class QueuingContainerManagerImpl extends ContainerManagerImpl {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(QueuingContainerManagerImpl.class);
+
+  private ConcurrentMap<ContainerId, AllocatedContainerInfo>
+        allocatedGuaranteedContainers;
+  private ConcurrentMap<ContainerId, AllocatedContainerInfo>
+        allocatedOpportunisticContainers;
+
+  private Queue<AllocatedContainerInfo> queuedGuaranteedContainers;
+  private Queue<AllocatedContainerInfo> queuedOpportunisticContainers;
+
+  private Set<ContainerId> opportunisticContainersToKill;
+
+  public QueuingContainerManagerImpl(Context context, ContainerExecutor exec,
+      DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
+      NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
+    super(context, exec, deletionContext, nodeStatusUpdater, metrics,
+        dirsHandler);
+    this.allocatedGuaranteedContainers = new ConcurrentHashMap<>();
+    this.allocatedOpportunisticContainers = new ConcurrentHashMap<>();
+    this.queuedGuaranteedContainers = new ConcurrentLinkedQueue<>();
+    this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>();
+    this.opportunisticContainersToKill = Collections.synchronizedSet(
+        new HashSet<ContainerId>());
+  }
+
+  @Override
+  protected EventHandler<ApplicationEvent> createApplicationEventDispatcher() {
+    return new QueuingApplicationEventDispatcher(
+        super.createApplicationEventDispatcher());
+  }
+
+  @Override
+  protected void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
+      ContainerTokenIdentifier containerTokenIdentifier,
+      StartContainerRequest request) throws YarnException, IOException {
+    this.context.getQueuingContext().getQueuedContainers().put(
+        containerTokenIdentifier.getContainerID(), containerTokenIdentifier);
+
+    AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo(
+        containerTokenIdentifier, nmTokenIdentifier, request,
+        containerTokenIdentifier.getExecutionType(), containerTokenIdentifier
+            .getResource(), getConfig());
+
+    // If there are already free resources for the container to start, and
+    // there are no queued containers waiting to be executed, start this
+    // container immediately.
+    if (queuedGuaranteedContainers.isEmpty() &&
+        queuedOpportunisticContainers.isEmpty() &&
+        getContainersMonitor().
+            hasResourcesAvailable(allocatedContInfo.getPti())) {
+      startAllocatedContainer(allocatedContInfo);
+    } else {
+      if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) {
+        queuedGuaranteedContainers.add(allocatedContInfo);
+        // Kill running opportunistic containers to make space for
+        // guaranteed container.
+        killOpportunisticContainers(allocatedContInfo);
+      } else {
+        queuedOpportunisticContainers.add(allocatedContInfo);
+      }
+    }
+  }
+
+  @Override
+  protected void stopContainerInternal(ContainerId containerID)
+      throws YarnException, IOException {
+    Container container = this.context.getContainers().get(containerID);
+    // If container is null and distributed scheduling is enabled, container
+    // might be queued. Otherwise, container might not be handled by this NM.
+    if (container == null && this.context.getQueuingContext()
+        .getQueuedContainers().containsKey(containerID)) {
+      ContainerTokenIdentifier containerTokenId = this.context
+          .getQueuingContext().getQueuedContainers().remove(containerID);
+
+      boolean foundInQueue = removeQueuedContainer(containerID,
+          containerTokenId.getExecutionType());
+
+      if (foundInQueue) {
+        this.context.getQueuingContext().getKilledQueuedContainers().put(
+            containerTokenId,
+            "Queued container request removed by ApplicationMaster.");
+      } else {
+        // The container started execution in the meanwhile.
+        try {
+          stopContainerInternalIfRunning(containerID);
+        } catch (YarnException | IOException e) {
+          LOG.error("Container did not get removed successfully.", e);
+        }
+      }
+
+      nodeStatusUpdater.sendOutofBandHeartBeat();
+    }
+    super.stopContainerInternal(containerID);
+  }
+
+  /**
+   * Start the execution of the given container. Also add it to the allocated
+   * containers, and update allocated resource utilization.
+   */
+  private void startAllocatedContainer(
+      AllocatedContainerInfo allocatedContainerInfo) {
+    ProcessTreeInfo pti = allocatedContainerInfo.getPti();
+
+    if (allocatedContainerInfo.getExecutionType() ==
+        ExecutionType.GUARANTEED) {
+      allocatedGuaranteedContainers.put(pti.getContainerId(),
+          allocatedContainerInfo);
+    } else {
+      allocatedOpportunisticContainers.put(pti.getContainerId(),
+          allocatedContainerInfo);
+    }
+
+    getContainersMonitor().increaseContainersAllocation(pti);
+
+    // Start execution of container.
+    ContainerId containerId = allocatedContainerInfo
+        .getContainerTokenIdentifier().getContainerID();
+    this.context.getQueuingContext().getQueuedContainers().remove(containerId);
+    try {
+      super.startContainerInternal(
+          allocatedContainerInfo.getNMTokenIdentifier(),
+          allocatedContainerInfo.getContainerTokenIdentifier(),
+          allocatedContainerInfo.getStartRequest());
+    } catch (YarnException | IOException e) {
+      containerFailedToStart(pti.getContainerId(),
+          allocatedContainerInfo.getContainerTokenIdentifier());
+      LOG.error("Container failed to start.", e);
+    }
+  }
+
+  private void containerFailedToStart(ContainerId containerId,
+      ContainerTokenIdentifier containerTokenId) {
+    this.context.getQueuingContext().getQueuedContainers().remove(containerId);
+
+    removeAllocatedContainer(containerId);
+
+    this.context.getQueuingContext().getKilledQueuedContainers().put(
+        containerTokenId,
+        "Container removed from queue as it failed to start.");
+  }
+
+  /**
+   * Remove the given container from the container queues.
+   *
+   * @return true if the container was found in one of the queues.
+   */
+  private boolean removeQueuedContainer(ContainerId containerId,
+      ExecutionType executionType) {
+    Queue<AllocatedContainerInfo> queue =
+        (executionType == ExecutionType.GUARANTEED) ?
+            queuedGuaranteedContainers : queuedOpportunisticContainers;
+
+    boolean foundInQueue = false;
+    Iterator<AllocatedContainerInfo> iter = queue.iterator();
+    while (iter.hasNext() && !foundInQueue) {
+      if (iter.next().getPti().getContainerId().equals(containerId)) {
+        iter.remove();
+        foundInQueue = true;
+      }
+    }
+
+    return foundInQueue;
+  }
+
+  /**
+   * Remove the given container from the allocated containers, and update
+   * allocated container utilization accordingly.
+   */
+  private void removeAllocatedContainer(ContainerId containerId) {
+    AllocatedContainerInfo contToRemove = null;
+
+    contToRemove = allocatedGuaranteedContainers.remove(containerId);
+
+    if (contToRemove == null) {
+      contToRemove = allocatedOpportunisticContainers.remove(containerId);
+    }
+
+    // If container was indeed running, update allocated resource utilization.
+    if (contToRemove != null) {
+      getContainersMonitor().decreaseContainersAllocation(contToRemove
+          .getPti());
+    }
+  }
+
+  /**
+   * Stop a container only if it is currently running. If queued, do not stop
+   * it.
+   */
+  private void stopContainerInternalIfRunning(ContainerId containerID)
+      throws YarnException, IOException {
+    if (this.context.getContainers().containsKey(containerID)) {
+      stopContainerInternal(containerID);
+    }
+  }
+
+  /**
+   * Kill opportunistic containers to free up resources for running the given
+   * container.
+   *
+   * @param allocatedContInfo
+   *          the container whose execution needs to start by freeing up
+   *          resources occupied by opportunistic containers.
+   */
+  private void killOpportunisticContainers(
+      AllocatedContainerInfo allocatedContInfo) {
+    ContainerId containerToStartId = allocatedContInfo.getPti()
+        .getContainerId();
+    List<ContainerId> extraOpportContainersToKill =
+        pickOpportunisticContainersToKill(containerToStartId);
+
+    // Kill the opportunistic containers that were chosen.
+    for (ContainerId contIdToKill : extraOpportContainersToKill) {
+      try {
+        stopContainerInternalIfRunning(contIdToKill);
+      } catch (YarnException | IOException e) {
+        LOG.error("Container did not get removed successfully.", e);
+      }
+      LOG.info(
+          "Opportunistic container {} will be killed in order to start the "
+              + "execution of guaranteed container {}.",
+              contIdToKill, containerToStartId);
+    }
+  }
+
+  /**
+   * Choose the opportunistic containers to kill in order to free up resources
+   * for running the given container.
+   *
+   * @param containerToStartId
+   *          the container whose execution needs to start by freeing up
+   *          resources occupied by opportunistic containers.
+   * @return the additional opportunistic containers that need to be killed.
+   */
+  protected List<ContainerId> pickOpportunisticContainersToKill(
+      ContainerId containerToStartId) {
+    // The additional opportunistic containers that need to be killed for the
+    // given container to start.
+    List<ContainerId> extraOpportContainersToKill = new ArrayList<>();
+    // Track resources that need to be freed.
+    ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(
+        containerToStartId);
+
+    // Go over the running opportunistic containers. Avoid containers that have
+    // already been marked for killing.
+    boolean hasSufficientResources = false;
+    for (Map.Entry<ContainerId, AllocatedContainerInfo> runningOpportCont :
+        allocatedOpportunisticContainers.entrySet()) {
+      ContainerId runningOpportContId = runningOpportCont.getKey();
+
+      // If there are sufficient resources to execute the given container, do
+      // not kill more opportunistic containers.
+      if (resourcesToFreeUp.getPhysicalMemory() <= 0 &&
+          resourcesToFreeUp.getVirtualMemory() <= 0 &&
+          resourcesToFreeUp.getCPU() <= 0.0f) {
+        hasSufficientResources = true;
+        break;
+      }
+
+      if (!opportunisticContainersToKill.contains(runningOpportContId)) {
+        extraOpportContainersToKill.add(runningOpportContId);
+        opportunisticContainersToKill.add(runningOpportContId);
+        getContainersMonitor().decreaseResourceUtilization(resourcesToFreeUp,
+            runningOpportCont.getValue().getPti());
+      }
+    }
+
+    if (!hasSufficientResources) {
+      LOG.info(
+          "There are no sufficient resources to start guaranteed {} even after "
+              + "attempting to kill any running opportunistic containers.",
+          containerToStartId);
+    }
+
+    return extraOpportContainersToKill;
+  }
+
+  /**
+   * Calculates the amount of resources that need to be freed up (by killing
+   * opportunistic containers) in order for the given guaranteed container to
+   * start its execution. Resource allocation to be freed up =
+   * <code>containersAllocation</code> -
+   *   allocation of <code>opportunisticContainersToKill</code> +
+   *   allocation of <code>queuedGuaranteedContainers</code> that will start
+   *     before the given container +
+   *   allocation of given container -
+   *   total resources of node.
+   *
+   * @param containerToStartId
+   *          the ContainerId of the guaranteed container for which we need to
+   *          free resources, so that its execution can start.
+   * @return the resources that need to be freed up for the given guaranteed
+   *         container to start.
+   */
+  private ResourceUtilization resourcesToFreeUp(
+      ContainerId containerToStartId) {
+    // Get allocation of currently allocated containers.
+    ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization
+        .newInstance(getContainersMonitor().getContainersAllocation());
+
+    // Subtract from the allocation the allocation of the opportunistic
+    // containers that are marked for killing.
+    for (ContainerId opportContId : opportunisticContainersToKill) {
+      if (allocatedOpportunisticContainers.containsKey(opportContId)) {
+        getContainersMonitor().decreaseResourceUtilization(
+            resourceAllocationToFreeUp,
+            allocatedOpportunisticContainers.get(opportContId).getPti());
+      }
+    }
+    // Add to the allocation the allocation of the pending guaranteed
+    // containers that will start before the current container will be started.
+    for (AllocatedContainerInfo guarContInfo : queuedGuaranteedContainers) {
+      getContainersMonitor().increaseResourceUtilization(
+          resourceAllocationToFreeUp, guarContInfo.getPti());
+      if (guarContInfo.getPti().getContainerId().equals(containerToStartId)) {
+        break;
+      }
+    }
+    // Subtract the overall node resources.
+    getContainersMonitor().subtractNodeResourcesFromResourceUtilization(
+        resourceAllocationToFreeUp);
+
+    return resourceAllocationToFreeUp;
+  }
+
+  /**
+   * If there are available resources, try to start as many pending containers
+   * as possible.
+   */
+  private void startPendingContainers() {
+    // Start pending guaranteed containers, if resources available.
+    boolean resourcesAvailable =
+        startContainersFromQueue(queuedGuaranteedContainers);
+
+    // Start opportunistic containers, if resources available.
+    if (resourcesAvailable) {
+      startContainersFromQueue(queuedOpportunisticContainers);
+    }
+  }
+
+  private boolean startContainersFromQueue(
+      Queue<AllocatedContainerInfo> queuedContainers) {
+    Iterator<AllocatedContainerInfo> guarIter = queuedContainers.iterator();
+    boolean resourcesAvailable = true;
+
+    while (guarIter.hasNext() && resourcesAvailable) {
+      AllocatedContainerInfo allocatedContInfo = guarIter.next();
+
+      if (getContainersMonitor().hasResourcesAvailable(
+          allocatedContInfo.getPti())) {
+        startAllocatedContainer(allocatedContInfo);
+        guarIter.remove();
+      } else {
+        resourcesAvailable = false;
+      }
+    }
+    return resourcesAvailable;
+  }
+
+  @Override
+  protected ContainerStatus getContainerStatusInternal(ContainerId containerID,
+      NMTokenIdentifier nmTokenIdentifier) throws YarnException {
+    Container container = this.context.getContainers().get(containerID);
+    if (container == null) {
+      ContainerTokenIdentifier containerTokenId = this.context
+          .getQueuingContext().getQueuedContainers().get(containerID);
+      if (containerTokenId != null) {
+        ExecutionType executionType = this.context.getQueuingContext()
+            .getQueuedContainers().get(containerID).getExecutionType();
+        return BuilderUtils.newContainerStatus(containerID,
+            org.apache.hadoop.yarn.api.records.ContainerState.QUEUED, "",
+            ContainerExitStatus.INVALID, this.context.getQueuingContext()
+                .getQueuedContainers().get(containerID).getResource(),
+            executionType);
+      }
+    }
+    return super.getContainerStatusInternal(containerID, nmTokenIdentifier);
+  }
+
+  @VisibleForTesting
+  public int getNumAllocatedGuaranteedContainers() {
+    return allocatedGuaranteedContainers.size();
+  }
+
+  @VisibleForTesting
+  public int getNumAllocatedOpportunisticContainers() {
+    return allocatedOpportunisticContainers.size();
+  }
+
+  class QueuingApplicationEventDispatcher implements
+      EventHandler<ApplicationEvent> {
+    private EventHandler<ApplicationEvent> applicationEventDispatcher;
+
+    public QueuingApplicationEventDispatcher(
+        EventHandler<ApplicationEvent> applicationEventDispatcher) {
+      this.applicationEventDispatcher = applicationEventDispatcher;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void handle(ApplicationEvent event) {
+      if (event.getType() ==
+          ApplicationEventType.APPLICATION_CONTAINER_FINISHED) {
+        if (!(event instanceof ApplicationContainerFinishedEvent)) {
+          throw new RuntimeException("Unexpected event type: " + event);
+        }
+        ApplicationContainerFinishedEvent finishEvent =
+            (ApplicationContainerFinishedEvent) event;
+        // Remove finished container from the allocated containers, and
+        // attempt to start new containers.
+        ContainerId contIdToRemove = finishEvent.getContainerID();
+        removeAllocatedContainer(contIdToRemove);
+        opportunisticContainersToKill.remove(contIdToRemove);
+        startPendingContainers();
+      }
+      this.applicationEventDispatcher.handle(event);
+    }
+  }
+
+  static class AllocatedContainerInfo {
+    private final ContainerTokenIdentifier containerTokenIdentifier;
+    private final NMTokenIdentifier nmTokenIdentifier;
+    private final StartContainerRequest startRequest;
+    private final ExecutionType executionType;
+    private final ProcessTreeInfo pti;
+
+    AllocatedContainerInfo(ContainerTokenIdentifier containerTokenIdentifier,
+        NMTokenIdentifier nmTokenIdentifier, StartContainerRequest startRequest,
+        ExecutionType executionType, Resource resource, Configuration conf) {
+      this.containerTokenIdentifier = containerTokenIdentifier;
+      this.nmTokenIdentifier = nmTokenIdentifier;
+      this.startRequest = startRequest;
+      this.executionType = executionType;
+      this.pti = createProcessTreeInfo(containerTokenIdentifier
+          .getContainerID(), resource, conf);
+    }
+
+    private ContainerTokenIdentifier getContainerTokenIdentifier() {
+      return this.containerTokenIdentifier;
+    }
+
+    private NMTokenIdentifier getNMTokenIdentifier() {
+      return this.nmTokenIdentifier;
+    }
+
+    private StartContainerRequest getStartRequest() {
+      return this.startRequest;
+    }
+
+    private ExecutionType getExecutionType() {
+      return this.executionType;
+    }
+
+    protected ProcessTreeInfo getPti() {
+      return this.pti;
+    }
+
+    private ProcessTreeInfo createProcessTreeInfo(ContainerId containerId,
+        Resource resource, Configuration conf) {
+      long pmemBytes = resource.getMemory() * 1024 * 1024L;
+      float pmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
+          YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
+      long vmemBytes = (long) (pmemRatio * pmemBytes);
+      int cpuVcores = resource.getVirtualCores();
+
+      return new ProcessTreeInfo(containerId, null, null, vmemBytes, pmemBytes,
+          cpuVcores);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      boolean equal = false;
+      if (obj instanceof AllocatedContainerInfo) {
+        AllocatedContainerInfo otherContInfo = (AllocatedContainerInfo) obj;
+        equal = this.getPti().getContainerId()
+            .equals(otherContInfo.getPti().getContainerId());
+      }
+      return equal;
+    }
+
+    @Override
+    public int hashCode() {
+      return this.getPti().getContainerId().hashCode();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b56fc51b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java
new file mode 100644
index 0000000..0250807
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * This package contains classes related to the queuing of containers at
+ * the NM.
+ *
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b56fc51b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index fa8d131..2fcce1d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b56fc51b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 1aea9d2..6c904eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -683,5 +683,10 @@ public abstract class BaseAMRMProxyTest {
     public NodeStatusUpdater getNodeStatusUpdater() {
       return null;
     }
+
+    @Override
+    public QueuingContext getQueuingContext() {
+      return null;
+    }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b56fc51b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index 787778e..f37129e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -280,21 +280,22 @@ public abstract class BaseContainerManagerTest {
     list.add(containerID);
     GetContainerStatusesRequest request =
         GetContainerStatusesRequest.newInstance(list);
-    ContainerStatus containerStatus =
-        containerManager.getContainerStatuses(request).getContainerStatuses()
-          .get(0);
+    ContainerStatus containerStatus = null;
     int timeoutSecs = 0;
-      while (!containerStatus.getState().equals(finalState)
-          && timeoutSecs++ < timeOutMax) {
-          Thread.sleep(1000);
-          LOG.info("Waiting for container to get into state " + finalState
-              + ". Current state is " + containerStatus.getState());
-          containerStatus = containerManager.getContainerStatuses(request).getContainerStatuses().get(0);
-        }
-        LOG.info("Container state is " + containerStatus.getState());
-        Assert.assertEquals("ContainerState is not correct (timedout)",
-            finalState, containerStatus.getState());
-      }
+    do {
+      Thread.sleep(2000);
+      containerStatus =
+          containerManager.getContainerStatuses(request)
+              .getContainerStatuses().get(0);
+      LOG.info("Waiting for container to get into state " + finalState
+          + ". Current state is " + containerStatus.getState());
+      timeoutSecs += 2;
+    } while (!containerStatus.getState().equals(finalState)
+        && timeoutSecs < timeOutMax);
+    LOG.info("Container state is " + containerStatus.getState());
+    Assert.assertEquals("ContainerState is not correct (timedout)",
+          finalState, containerStatus.getState());
+  }
 
   static void waitForApplicationState(ContainerManagerImpl containerManager,
       ApplicationId appID, ApplicationState finalState)
@@ -328,19 +329,24 @@ public abstract class BaseContainerManagerTest {
           org.apache.hadoop.yarn.server.nodemanager.containermanager
           .container.ContainerState finalState, int timeOutMax)
               throws InterruptedException, YarnException, IOException {
-    Container container =
-        containerManager.getContext().getContainers().get(containerID);
+    Container container = null;
     org.apache.hadoop.yarn.server.nodemanager
-        .containermanager.container.ContainerState currentState =
-            container.getContainerState();
+        .containermanager.container.ContainerState currentState = null;
     int timeoutSecs = 0;
-    while (!currentState.equals(finalState)
-        && timeoutSecs++ < timeOutMax) {
-      Thread.sleep(1000);
-      LOG.info("Waiting for NM container to get into state " + finalState
-          + ". Current state is " + currentState);
-      currentState = container.getContainerState();
-    }
+    do {
+      Thread.sleep(2000);
+      container =
+          containerManager.getContext().getContainers().get(containerID);
+      if (container != null) {
+        currentState = container.getContainerState();
+      }
+      if (currentState != null) {
+        LOG.info("Waiting for NM container to get into state " + finalState
+            + ". Current state is " + currentState);
+      }
+      timeoutSecs += 2;
+    } while (!currentState.equals(finalState)
+        && timeoutSecs++ < timeOutMax);
     LOG.info("Container state is " + currentState);
     Assert.assertEquals("ContainerState is not correct (timedout)",
         finalState, currentState);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message