hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vvasu...@apache.org
Subject [35/50] [abbrv] hadoop git commit: YARN-4597. Introduce ContainerScheduler and a SCHEDULED state to NodeManager container lifecycle. (asuresh)
Date Wed, 16 Nov 2016 13:46:46 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index 6dd1ac7..827bdb3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -192,10 +193,10 @@ public abstract class BaseContainerManagerTest {
 
     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
     // Default delSrvc
+    exec = createContainerExecutor();
     delSrvc = createDeletionService();
     delSrvc.init(conf);
 
-    exec = createContainerExecutor();
     dirsHandler = new LocalDirsHandlerService();
     nodeHealthChecker = new NodeHealthCheckerService(
         NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
@@ -288,32 +289,43 @@ public abstract class BaseContainerManagerTest {
       ContainerManagementProtocol containerManager, ContainerId containerID,
       ContainerState finalState)
       throws InterruptedException, YarnException, IOException {
-    waitForContainerState(containerManager, containerID, finalState, 20);
+    waitForContainerState(containerManager, containerID,
+        Arrays.asList(finalState), 20);
   }
 
   public static void waitForContainerState(
       ContainerManagementProtocol containerManager, ContainerId containerID,
       ContainerState finalState, int timeOutMax)
       throws InterruptedException, YarnException, IOException {
+    waitForContainerState(containerManager, containerID,
+        Arrays.asList(finalState), timeOutMax);
+  }
+
+  public static void waitForContainerState(
+      ContainerManagementProtocol containerManager, ContainerId containerID,
+      List<ContainerState> finalStates, int timeOutMax)
+      throws InterruptedException, YarnException, IOException {
     List<ContainerId> list = new ArrayList<ContainerId>();
     list.add(containerID);
     GetContainerStatusesRequest request =
         GetContainerStatusesRequest.newInstance(list);
     ContainerStatus containerStatus = null;
+    HashSet<ContainerState> fStates =
+        new HashSet<>(finalStates);
     int timeoutSecs = 0;
     do {
       Thread.sleep(2000);
       containerStatus =
           containerManager.getContainerStatuses(request)
               .getContainerStatuses().get(0);
-      LOG.info("Waiting for container to get into state " + finalState
+      LOG.info("Waiting for container to get into one of states " + fStates
           + ". Current state is " + containerStatus.getState());
       timeoutSecs += 2;
-    } while (!containerStatus.getState().equals(finalState)
+    } while (!fStates.contains(containerStatus.getState())
         && timeoutSecs < timeOutMax);
     LOG.info("Container state is " + containerStatus.getState());
-    Assert.assertEquals("ContainerState is not correct (timedout)",
-          finalState, containerStatus.getState());
+    Assert.assertTrue("ContainerState is not correct (timedout)",
+          fStates.contains(containerStatus.getState()));
   }
 
   public static void waitForApplicationState(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index 32dddae..31546f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -94,6 +94,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
+
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@@ -101,7 +105,6 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecret
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -551,6 +554,35 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
           throw new YarnException("Reject this container");
         }
       }
+      @Override
+      protected ContainerScheduler createContainerScheduler(Context context) {
+        return new ContainerScheduler(context, dispatcher, metrics){
+          @Override
+          public ContainersMonitor getContainersMonitor() {
+            return new ContainersMonitorImpl(null, null, null) {
+              @Override
+              public float getVmemRatio() {
+                return 2.0f;
+              }
+
+              @Override
+              public long getVmemAllocatedForContainers() {
+                return 20480;
+              }
+
+              @Override
+              public long getPmemAllocatedForContainers() {
+                return 10240;
+              }
+
+              @Override
+              public long getVCoresAllocatedForContainers() {
+                return 4;
+              }
+            };
+          }
+        };
+      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRegression.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRegression.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRegression.java
deleted file mode 100644
index 71af76f..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRegression.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager;
-
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.NMTokenIdentifier;
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl;
-
-/**
- * Test class that invokes all test cases of {@link TestContainerManager} while
- * using the {@link QueuingContainerManagerImpl}. The goal is to assert that
- * no regression is introduced in the existing cases when no queuing of tasks at
- * the NMs is involved.
- */
-public class TestContainerManagerRegression extends TestContainerManager {
-
-  public TestContainerManagerRegression()
-      throws UnsupportedFileSystemException {
-    super();
-  }
-
-  static {
-    LOG = LogFactory.getLog(TestContainerManagerRegression.class);
-  }
-
-  @Override
-  protected ContainerManagerImpl createContainerManager(
-      DeletionService delSrvc) {
-    return new QueuingContainerManagerImpl(context, exec, delSrvc,
-        nodeStatusUpdater, metrics, dirsHandler) {
-      @Override
-      public void
-          setBlockNewContainerRequests(boolean blockNewContainerRequests) {
-        // do nothing
-      }
-
-      @Override
-      protected UserGroupInformation getRemoteUgi() throws YarnException {
-        ApplicationId appId = ApplicationId.newInstance(0, 0);
-        ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
-            appId, 1);
-        UserGroupInformation ugi = UserGroupInformation.createRemoteUser(
-            appAttemptId.toString());
-        ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
-            .getNodeId(), user, context.getNMTokenSecretManager()
-                .getCurrentKey().getKeyId()));
-        return ugi;
-      }
-
-      @Override
-      protected void authorizeGetAndStopContainerRequest(
-          ContainerId containerId, Container container, boolean stopRequest,
-          NMTokenIdentifier identifier) throws YarnException {
-        if (container == null || container.getUser().equals("Fail")) {
-          throw new YarnException("Reject this container");
-        }
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
index 766a1f9..33f4609 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.atLeastOnce;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
@@ -90,6 +91,11 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
+
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
@@ -143,7 +149,7 @@ public class TestContainer {
       Map<Path, List<String>> localPaths = wc.localizeResources();
 
       // all resources should be localized
-      assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
+      assertEquals(ContainerState.SCHEDULED, wc.c.getContainerState());
       assertNotNull(wc.c.getLocalizedResources());
       for (Entry<Path, List<String>> loc : wc.c.getLocalizedResources()
           .entrySet()) {
@@ -421,7 +427,7 @@ public class TestContainer {
       wc = new WrappedContainer(17, 314159265358979L, 4344, "yak");
       wc.initContainer();
       wc.localizeResources();
-      assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
+      assertEquals(ContainerState.SCHEDULED, wc.c.getContainerState());
       ContainerLaunch launcher = wc.launcher.running.get(wc.c.getContainerId());
       wc.killContainer();
       assertEquals(ContainerState.KILLING, wc.c.getContainerState());
@@ -452,7 +458,7 @@ public class TestContainer {
       wc = new WrappedContainer(17, 314159265358979L, 4344, "yak");
       wc.initContainer();
       wc.localizeResources();
-      assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
+      assertEquals(ContainerState.SCHEDULED, wc.c.getContainerState());
       wc.killContainer();
       assertEquals(ContainerState.KILLING, wc.c.getContainerState());
       wc.containerSuccessful();
@@ -480,7 +486,7 @@ public class TestContainer {
       wc = new WrappedContainer(17, 314159265358979L, 4344, "yak");
       wc.initContainer();
       wc.localizeResources();
-      assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
+      assertEquals(ContainerState.SCHEDULED, wc.c.getContainerState());
       wc.killContainer();
       assertEquals(ContainerState.KILLING, wc.c.getContainerState());
       wc.containerFailed(ExitCode.FORCE_KILLED.getExitCode());
@@ -507,7 +513,7 @@ public class TestContainer {
       wc = new WrappedContainer(17, 314159265358979L, 4344, "yak");
       wc.initContainer();
       wc.localizeResources();
-      assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
+      assertEquals(ContainerState.SCHEDULED, wc.c.getContainerState());
       ContainerLaunch launcher = wc.launcher.running.get(wc.c.getContainerId());
       launcher.call();
       wc.drainDispatcherEvents();
@@ -764,7 +770,7 @@ public class TestContainer {
         new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
             LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE,
             LocalResourceVisibility.APPLICATION));
-    verify(wc.localizerBus).handle(argThat(matchesReq));
+    verify(wc.localizerBus, atLeastOnce()).handle(argThat(matchesReq));
   }
 
   private void verifyOutofBandHeartBeat(WrappedContainer wc) {
@@ -890,6 +896,7 @@ public class TestContainer {
     final EventHandler<AuxServicesEvent> auxBus;
     final EventHandler<ApplicationEvent> appBus;
     final EventHandler<LogHandlerEvent> LogBus;
+    final EventHandler<ContainerSchedulerEvent> schedBus;
     final ContainersLauncher launcher;
 
     final ContainerLaunchContext ctxt;
@@ -927,9 +934,16 @@ public class TestContainer {
       auxBus = mock(EventHandler.class);
       appBus = mock(EventHandler.class);
       LogBus = mock(EventHandler.class);
+      schedBus = new ContainerScheduler(context, dispatcher, metrics, 0) {
+        @Override
+        protected void scheduleContainer(Container container) {
+          container.sendLaunchEvent();
+        }
+      };
       dispatcher.register(LocalizationEventType.class, localizerBus);
       dispatcher.register(ContainersLauncherEventType.class, launcherBus);
       dispatcher.register(ContainersMonitorEventType.class, monitorBus);
+      dispatcher.register(ContainerSchedulerEventType.class, schedBus);
       dispatcher.register(AuxServicesEventType.class, auxBus);
       dispatcher.register(ApplicationEventType.class, appBus);
       dispatcher.register(LogHandlerEventType.class, LogBus);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java
deleted file mode 100644
index 7f06afa..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java
+++ /dev/null
@@ -1,596 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.NMTokenIdentifier;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Class for testing the {@link QueuingContainerManagerImpl}.
- */
-public class TestQueuingContainerManager extends BaseContainerManagerTest {
-  public TestQueuingContainerManager() throws UnsupportedFileSystemException {
-    super();
-  }
-
-  static {
-    LOG = LogFactory.getLog(TestQueuingContainerManager.class);
-  }
-
-  boolean shouldDeleteWait = false;
-
-  @Override
-  protected ContainerManagerImpl createContainerManager(
-      DeletionService delSrvc) {
-    return new QueuingContainerManagerImpl(context, exec, delSrvc,
-        nodeStatusUpdater, metrics, dirsHandler) {
-      @Override
-      public void
-      setBlockNewContainerRequests(boolean blockNewContainerRequests) {
-        // do nothing
-      }
-
-      @Override
-      protected UserGroupInformation getRemoteUgi() throws YarnException {
-        ApplicationId appId = ApplicationId.newInstance(0, 0);
-        ApplicationAttemptId appAttemptId =
-            ApplicationAttemptId.newInstance(appId, 1);
-        UserGroupInformation ugi =
-            UserGroupInformation.createRemoteUser(appAttemptId.toString());
-        ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
-            .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
-            .getKeyId()));
-        return ugi;
-      }
-
-      @Override
-      protected ContainersMonitor createContainersMonitor(
-          ContainerExecutor exec) {
-        return new ContainersMonitorImpl(exec, dispatcher, this.context) {
-          // Define resources available for containers to be executed.
-          @Override
-          public long getPmemAllocatedForContainers() {
-            return 2048 * 1024 * 1024L;
-          }
-
-          @Override
-          public long getVmemAllocatedForContainers() {
-            float pmemRatio = getConfig().getFloat(
-                YarnConfiguration.NM_VMEM_PMEM_RATIO,
-                YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
-            return (long) (pmemRatio * getPmemAllocatedForContainers());
-          }
-
-          @Override
-          public long getVCoresAllocatedForContainers() {
-            return 4;
-          }
-        };
-      }
-    };
-  }
-
-  @Override
-  protected DeletionService createDeletionService() {
-    return new DeletionService(exec) {
-      @Override
-      public void delete(String user, Path subDir, Path... baseDirs) {
-        // Don't do any deletions.
-        if (shouldDeleteWait) {
-          try {
-            Thread.sleep(10000);
-            LOG.info("\n\nSleeping Pseudo delete : user - " + user + ", " +
-                "subDir - " + subDir + ", " +
-                "baseDirs - " + Arrays.asList(baseDirs));
-          } catch (InterruptedException e) {
-            e.printStackTrace();
-          }
-        } else {
-          LOG.info("\n\nPseudo delete : user - " + user + ", " +
-              "subDir - " + subDir + ", " +
-              "baseDirs - " + Arrays.asList(baseDirs));
-        }
-      }
-    };
-  }
-
-  @Override
-  public void setup() throws IOException {
-    super.setup();
-    shouldDeleteWait = false;
-  }
-
-  /**
-   * Starting one GUARANTEED and one OPPORTUNISTIC container.
-   * @throws Exception
-   */
-  @Test
-  public void testStartMultipleContainers() throws Exception {
-    shouldDeleteWait = true;
-    containerManager.start();
-
-    ContainerLaunchContext containerLaunchContext =
-        recordFactory.newRecordInstance(ContainerLaunchContext.class);
-
-    List<StartContainerRequest> list = new ArrayList<>();
-    list.add(StartContainerRequest.newInstance(
-        containerLaunchContext,
-        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
-            context.getNodeId(),
-            user, BuilderUtils.newResource(1024, 1),
-            context.getContainerTokenSecretManager(), null,
-            ExecutionType.GUARANTEED)));
-    list.add(StartContainerRequest.newInstance(
-        containerLaunchContext,
-        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
-            context.getNodeId(),
-            user, BuilderUtils.newResource(1024, 1),
-            context.getContainerTokenSecretManager(), null,
-            ExecutionType.OPPORTUNISTIC)));
-
-    StartContainersRequest allRequests =
-        StartContainersRequest.newInstance(list);
-    containerManager.startContainers(allRequests);
-
-    BaseContainerManagerTest.waitForContainerState(containerManager,
-        createContainerId(0),
-        org.apache.hadoop.yarn.api.records.ContainerState.RUNNING);
-    BaseContainerManagerTest.waitForContainerState(containerManager,
-        createContainerId(1),
-        org.apache.hadoop.yarn.api.records.ContainerState.RUNNING);
-
-    // Ensure all containers are running.
-    List<ContainerId> statList = new ArrayList<ContainerId>();
-    for (int i = 0; i < 2; i++) {
-      statList.add(createContainerId(i));
-    }
-    GetContainerStatusesRequest statRequest =
-        GetContainerStatusesRequest.newInstance(statList);
-    List<ContainerStatus> containerStatuses = containerManager
-        .getContainerStatuses(statRequest).getContainerStatuses();
-    for (ContainerStatus status : containerStatuses) {
-      Assert.assertEquals(
-          org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
-          status.getState());
-    }
-  }
-
-  /**
-   * Submit both a GUARANTEED and an OPPORTUNISTIC container, each of which
-   * requires more resources than available at the node, and make sure they
-   * are both queued.
-   * @throws Exception
-   */
-  @Test
-  public void testQueueMultipleContainers() throws Exception {
-    shouldDeleteWait = true;
-    containerManager.start();
-
-    ContainerLaunchContext containerLaunchContext =
-        recordFactory.newRecordInstance(ContainerLaunchContext.class);
-
-    List<StartContainerRequest> list = new ArrayList<>();
-    list.add(StartContainerRequest.newInstance(
-        containerLaunchContext,
-        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
-            context.getNodeId(),
-            user, BuilderUtils.newResource(3072, 1),
-            context.getContainerTokenSecretManager(), null,
-            ExecutionType.GUARANTEED)));
-    list.add(StartContainerRequest.newInstance(
-        containerLaunchContext,
-        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
-            context.getNodeId(),
-            user, BuilderUtils.newResource(3072, 1),
-            context.getContainerTokenSecretManager(), null,
-            ExecutionType.OPPORTUNISTIC)));
-
-    StartContainersRequest allRequests =
-        StartContainersRequest.newInstance(list);
-    containerManager.startContainers(allRequests);
-
-    Thread.sleep(5000);
-
-    // Ensure both containers are queued.
-    List<ContainerId> statList = new ArrayList<ContainerId>();
-    for (int i = 0; i < 2; i++) {
-      statList.add(createContainerId(i));
-    }
-    GetContainerStatusesRequest statRequest =
-        GetContainerStatusesRequest.newInstance(statList);
-    List<ContainerStatus> containerStatuses = containerManager
-        .getContainerStatuses(statRequest).getContainerStatuses();
-    for (ContainerStatus status : containerStatuses) {
-      Assert.assertEquals(
-          org.apache.hadoop.yarn.api.records.ContainerState.QUEUED,
-          status.getState());
-    }
-
-    // Ensure both containers are properly queued.
-    Assert.assertEquals(2, containerManager.getContext().getQueuingContext()
-        .getQueuedContainers().size());
-    Assert.assertEquals(1, ((QueuingContainerManagerImpl) containerManager)
-        .getNumQueuedGuaranteedContainers());
-    Assert.assertEquals(1, ((QueuingContainerManagerImpl) containerManager)
-        .getNumQueuedOpportunisticContainers());
-  }
-
-  /**
-   * Starts one OPPORTUNISTIC container that takes up the whole node's
-   * resources, and submit two more that will be queued.
-   * @throws Exception
-   */
-  @Test
-  public void testStartAndQueueMultipleContainers() throws Exception {
-    shouldDeleteWait = true;
-    containerManager.start();
-
-    ContainerLaunchContext containerLaunchContext =
-        recordFactory.newRecordInstance(ContainerLaunchContext.class);
-
-    List<StartContainerRequest> list = new ArrayList<>();
-    list.add(StartContainerRequest.newInstance(
-        containerLaunchContext,
-        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
-            context.getNodeId(),
-            user, BuilderUtils.newResource(2048, 1),
-            context.getContainerTokenSecretManager(), null,
-            ExecutionType.OPPORTUNISTIC)));
-    list.add(StartContainerRequest.newInstance(
-        containerLaunchContext,
-        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
-            context.getNodeId(),
-            user, BuilderUtils.newResource(1024, 1),
-            context.getContainerTokenSecretManager(), null,
-            ExecutionType.OPPORTUNISTIC)));
-    list.add(StartContainerRequest.newInstance(
-        containerLaunchContext,
-        createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
-            context.getNodeId(),
-            user, BuilderUtils.newResource(1024, 1),
-            context.getContainerTokenSecretManager(), null,
-            ExecutionType.OPPORTUNISTIC)));
-
-    StartContainersRequest allRequests =
-        StartContainersRequest.newInstance(list);
-    containerManager.startContainers(allRequests);
-
-    Thread.sleep(5000);
-
-    // Ensure first container is running and others are queued.
-    List<ContainerId> statList = new ArrayList<ContainerId>();
-    for (int i = 0; i < 3; i++) {
-      statList.add(createContainerId(i));
-    }
-    GetContainerStatusesRequest statRequest = GetContainerStatusesRequest
-        .newInstance(Arrays.asList(createContainerId(0)));
-    List<ContainerStatus> containerStatuses = containerManager
-        .getContainerStatuses(statRequest).getContainerStatuses();
-    for (ContainerStatus status : containerStatuses) {
-      if (status.getContainerId().equals(createContainerId(0))) {
-        Assert.assertEquals(
-            org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
-            status.getState());
-      } else {
-        Assert.assertEquals(
-            org.apache.hadoop.yarn.api.records.ContainerState.QUEUED,
-            status.getState());
-      }
-    }
-
-    // Ensure two containers are properly queued.
-    Assert.assertEquals(2, containerManager.getContext().getQueuingContext()
-        .getQueuedContainers().size());
-    Assert.assertEquals(0, ((QueuingContainerManagerImpl) containerManager)
-        .getNumQueuedGuaranteedContainers());
-    Assert.assertEquals(2, ((QueuingContainerManagerImpl) containerManager)
-        .getNumQueuedOpportunisticContainers());
-  }
-
-  /**
-   * Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources
-   * requests by each container as such that only one can run in parallel.
-   * Thus, the OPPORTUNISTIC container that started running, will be
-   * killed for the GUARANTEED container to start.
-   * Once the GUARANTEED container finishes its execution, the remaining
-   * OPPORTUNISTIC container will be executed.
-   * @throws Exception
-   */
-  @Test
-  public void testKillOpportunisticForGuaranteedContainer() throws Exception {
-    shouldDeleteWait = true;
-    containerManager.start();
-
-    ContainerLaunchContext containerLaunchContext =
-        recordFactory.newRecordInstance(ContainerLaunchContext.class);
-
-    List<StartContainerRequest> list = new ArrayList<>();
-    list.add(StartContainerRequest.newInstance(
-        containerLaunchContext,
-        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
-            context.getNodeId(),
-            user, BuilderUtils.newResource(2048, 1),
-            context.getContainerTokenSecretManager(), null,
-            ExecutionType.OPPORTUNISTIC)));
-    list.add(StartContainerRequest.newInstance(
-        containerLaunchContext,
-        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
-            context.getNodeId(),
-            user, BuilderUtils.newResource(2048, 1),
-            context.getContainerTokenSecretManager(), null,
-            ExecutionType.OPPORTUNISTIC)));
-    list.add(StartContainerRequest.newInstance(
-        containerLaunchContext,
-        createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
-            context.getNodeId(),
-            user, BuilderUtils.newResource(2048, 1),
-            context.getContainerTokenSecretManager(), null,
-            ExecutionType.GUARANTEED)));
-
-    StartContainersRequest allRequests =
-        StartContainersRequest.newInstance(list);
-    containerManager.startContainers(allRequests);
-
-    BaseContainerManagerTest.waitForNMContainerState(containerManager,
-        createContainerId(0), ContainerState.DONE, 40);
-    Thread.sleep(5000);
-
-    // Get container statuses. Container 0 should be killed, container 1
-    // should be queued and container 2 should be running.
-    List<ContainerId> statList = new ArrayList<ContainerId>();
-    for (int i = 0; i < 3; i++) {
-      statList.add(createContainerId(i));
-    }
-    GetContainerStatusesRequest statRequest =
-        GetContainerStatusesRequest.newInstance(statList);
-    List<ContainerStatus> containerStatuses = containerManager
-        .getContainerStatuses(statRequest).getContainerStatuses();
-    for (ContainerStatus status : containerStatuses) {
-      if (status.getContainerId().equals(createContainerId(0))) {
-        Assert.assertTrue(status.getDiagnostics()
-            .contains("Container killed by the ApplicationMaster"));
-      } else if (status.getContainerId().equals(createContainerId(1))) {
-        Assert.assertEquals(
-            org.apache.hadoop.yarn.api.records.ContainerState.QUEUED,
-            status.getState());
-      } else if (status.getContainerId().equals(createContainerId(2))) {
-        Assert.assertEquals(
-            org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
-            status.getState());
-      }
-      System.out.println("\nStatus : [" + status + "]\n");
-    }
-
-    // Make sure the remaining OPPORTUNISTIC container starts its execution.
-    BaseContainerManagerTest.waitForNMContainerState(containerManager,
-        createContainerId(2), ContainerState.DONE, 40);
-    Thread.sleep(5000);
-    statRequest = GetContainerStatusesRequest.newInstance(Arrays.asList(
-        createContainerId(1)));
-    ContainerStatus contStatus1 = containerManager.getContainerStatuses(
-        statRequest).getContainerStatuses().get(0);
-    Assert.assertEquals(
-        org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
-        contStatus1.getState());
-  }
-
-  /**
-   * Submit three OPPORTUNISTIC containers that can run concurrently, and one
-   * GUARANTEED that needs to kill two of the OPPORTUNISTIC for it to run.
-   * @throws Exception
-   */
-  @Test
-  public void testKillMultipleOpportunisticContainers() throws Exception {
-    shouldDeleteWait = true;
-    containerManager.start();
-
-    ContainerLaunchContext containerLaunchContext =
-        recordFactory.newRecordInstance(ContainerLaunchContext.class);
-
-    List<StartContainerRequest> list = new ArrayList<>();
-    list.add(StartContainerRequest.newInstance(
-        containerLaunchContext,
-        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
-            context.getNodeId(),
-            user, BuilderUtils.newResource(512, 1),
-            context.getContainerTokenSecretManager(), null,
-            ExecutionType.OPPORTUNISTIC)));
-    list.add(StartContainerRequest.newInstance(
-        containerLaunchContext,
-        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
-            context.getNodeId(),
-            user, BuilderUtils.newResource(512, 1),
-            context.getContainerTokenSecretManager(), null,
-            ExecutionType.OPPORTUNISTIC)));
-    list.add(StartContainerRequest.newInstance(
-        containerLaunchContext,
-        createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
-            context.getNodeId(),
-            user, BuilderUtils.newResource(512, 1),
-            context.getContainerTokenSecretManager(), null,
-            ExecutionType.OPPORTUNISTIC)));
-    list.add(StartContainerRequest.newInstance(
-        containerLaunchContext,
-        createContainerToken(createContainerId(3), DUMMY_RM_IDENTIFIER,
-            context.getNodeId(),
-            user, BuilderUtils.newResource(1500, 1),
-            context.getContainerTokenSecretManager(), null,
-            ExecutionType.GUARANTEED)));
-
-    StartContainersRequest allRequests =
-        StartContainersRequest.newInstance(list);
-    containerManager.startContainers(allRequests);
-
-    BaseContainerManagerTest.waitForNMContainerState(
-        containerManager, createContainerId(0),
-            Arrays.asList(ContainerState.DONE,
-                ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL), 40);
-    Thread.sleep(5000);
-
-    // Get container statuses. Container 0 should be killed, container 1
-    // should be queued and container 2 should be running.
-    int killedContainers = 0;
-    List<ContainerId> statList = new ArrayList<ContainerId>();
-    for (int i = 0; i < 4; i++) {
-      statList.add(createContainerId(i));
-    }
-    GetContainerStatusesRequest statRequest =
-        GetContainerStatusesRequest.newInstance(statList);
-    List<ContainerStatus> containerStatuses = containerManager
-        .getContainerStatuses(statRequest).getContainerStatuses();
-    for (ContainerStatus status : containerStatuses) {
-      if (status.getDiagnostics().contains(
-          "Container killed by the ApplicationMaster")) {
-        killedContainers++;
-      }
-      System.out.println("\nStatus : [" + status + "]\n");
-    }
-
-    Assert.assertEquals(2, killedContainers);
-  }
-
-  /**
-   * Start running one GUARANTEED container and queue two OPPORTUNISTIC ones.
-   * Try killing one of the two queued containers.
-   * @throws Exception
-   */
-  @Test
-  public void testStopQueuedContainer() throws Exception {
-    shouldDeleteWait = true;
-    containerManager.start();
-
-    ContainerLaunchContext containerLaunchContext =
-        recordFactory.newRecordInstance(ContainerLaunchContext.class);
-
-    List<StartContainerRequest> list = new ArrayList<>();
-    list.add(StartContainerRequest.newInstance(
-        containerLaunchContext,
-        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
-            context.getNodeId(),
-            user, BuilderUtils.newResource(2048, 1),
-            context.getContainerTokenSecretManager(), null,
-            ExecutionType.GUARANTEED)));
-    list.add(StartContainerRequest.newInstance(
-        containerLaunchContext,
-        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
-            context.getNodeId(),
-            user, BuilderUtils.newResource(512, 1),
-            context.getContainerTokenSecretManager(), null,
-            ExecutionType.OPPORTUNISTIC)));
-    list.add(StartContainerRequest.newInstance(
-        containerLaunchContext,
-        createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
-            context.getNodeId(),
-            user, BuilderUtils.newResource(512, 1),
-            context.getContainerTokenSecretManager(), null,
-            ExecutionType.OPPORTUNISTIC)));
-
-    StartContainersRequest allRequests =
-        StartContainersRequest.newInstance(list);
-    containerManager.startContainers(allRequests);
-
-    Thread.sleep(2000);
-
-    // Assert there is initially one container running and two queued.
-    int runningContainersNo = 0;
-    int queuedContainersNo = 0;
-    List<ContainerId> statList = new ArrayList<ContainerId>();
-    for (int i = 0; i < 3; i++) {
-      statList.add(createContainerId(i));
-    }
-    GetContainerStatusesRequest statRequest = GetContainerStatusesRequest
-        .newInstance(statList);
-    List<ContainerStatus> containerStatuses = containerManager
-        .getContainerStatuses(statRequest).getContainerStatuses();
-    for (ContainerStatus status : containerStatuses) {
-      if (status.getState() ==
-          org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) {
-        runningContainersNo++;
-      } else if (status.getState() ==
-          org.apache.hadoop.yarn.api.records.ContainerState.QUEUED) {
-        queuedContainersNo++;
-      }
-      System.out.println("\nStatus : [" + status + "]\n");
-    }
-
-    Assert.assertEquals(1, runningContainersNo);
-    Assert.assertEquals(2, queuedContainersNo);
-
-    // Stop one of the two queued containers.
-    StopContainersRequest stopRequest = StopContainersRequest.
-        newInstance(Arrays.asList(createContainerId(1)));
-    containerManager.stopContainers(stopRequest);
-
-    Thread.sleep(2000);
-
-    // Assert queued container got properly stopped.
-    statList.clear();
-    for (int i = 0; i < 3; i++) {
-      statList.add(createContainerId(i));
-    }
-    statRequest = GetContainerStatusesRequest.newInstance(statList);
-    containerStatuses = containerManager.getContainerStatuses(statRequest)
-        .getContainerStatuses();
-    for (ContainerStatus status : containerStatuses) {
-      if (status.getContainerId().equals(createContainerId(0))) {
-        Assert.assertEquals(
-            org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
-            status.getState());
-      } else if (status.getContainerId().equals(createContainerId(1))) {
-        Assert.assertTrue(status.getDiagnostics().contains(
-            "Queued container request removed"));
-      } else if (status.getContainerId().equals(createContainerId(2))) {
-        Assert.assertEquals(
-            org.apache.hadoop.yarn.api.records.ContainerState.QUEUED,
-            status.getState());
-      }
-      System.out.println("\nStatus : [" + status + "]\n");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
new file mode 100644
index 0000000..24e388f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
@@ -0,0 +1,872 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.spy;
+
+/**
+ * Tests to verify that the {@link ContainerScheduler} is able to queue and
+ * make room for containers.
+ */
+public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
+  public TestContainerSchedulerQueuing() throws UnsupportedFileSystemException {
+    super();
+  }
+
+  static {
+    LOG = LogFactory.getLog(TestContainerSchedulerQueuing.class);
+  }
+
+  private boolean delayContainers = true;
+
+  @Override
+  protected ContainerManagerImpl createContainerManager(
+      DeletionService delSrvc) {
+    return new ContainerManagerImpl(context, exec, delSrvc,
+        nodeStatusUpdater, metrics, dirsHandler) {
+      @Override
+      public void
+      setBlockNewContainerRequests(boolean blockNewContainerRequests) {
+        // do nothing
+      }
+
+      @Override
+      protected UserGroupInformation getRemoteUgi() throws YarnException {
+        ApplicationId appId = ApplicationId.newInstance(0, 0);
+        ApplicationAttemptId appAttemptId =
+            ApplicationAttemptId.newInstance(appId, 1);
+        UserGroupInformation ugi =
+            UserGroupInformation.createRemoteUser(appAttemptId.toString());
+        ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
+            .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
+            .getKeyId()));
+        return ugi;
+      }
+
+      @Override
+      protected ContainersMonitor createContainersMonitor(
+          ContainerExecutor exec) {
+        return new ContainersMonitorImpl(exec, dispatcher, this.context) {
+          // Define resources available for containers to be executed.
+          @Override
+          public long getPmemAllocatedForContainers() {
+            return 2048 * 1024 * 1024L;
+          }
+
+          @Override
+          public long getVmemAllocatedForContainers() {
+            float pmemRatio = getConfig().getFloat(
+                YarnConfiguration.NM_VMEM_PMEM_RATIO,
+                YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
+            return (long) (pmemRatio * getPmemAllocatedForContainers());
+          }
+
+          @Override
+          public long getVCoresAllocatedForContainers() {
+            return 4;
+          }
+        };
+      }
+    };
+  }
+
+  @Override
+  protected ContainerExecutor createContainerExecutor() {
+    DefaultContainerExecutor exec = new DefaultContainerExecutor() {
+      @Override
+      public int launchContainer(ContainerStartContext ctx) throws IOException {
+        if (delayContainers) {
+          try {
+            Thread.sleep(10000);
+          } catch (InterruptedException e) {
+            // Nothing..
+          }
+        }
+        return super.launchContainer(ctx);
+      }
+    };
+    exec.setConf(conf);
+    return spy(exec);
+  }
+
+  @Override
+  public void setup() throws IOException {
+    conf.setInt(
+        YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
+    super.setup();
+  }
+
+  /**
+   * Starting one GUARANTEED and one OPPORTUNISTIC container.
+   * @throws Exception
+   */
+  @Test
+  public void testStartMultipleContainers() throws Exception {
+    containerManager.start();
+
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    List<StartContainerRequest> list = new ArrayList<>();
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(1024, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.GUARANTEED)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(1024, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+
+    BaseContainerManagerTest.waitForContainerState(containerManager,
+        createContainerId(0),
+        org.apache.hadoop.yarn.api.records.ContainerState.RUNNING);
+    BaseContainerManagerTest.waitForContainerState(containerManager,
+        createContainerId(1),
+        org.apache.hadoop.yarn.api.records.ContainerState.RUNNING);
+
+    // Ensure all containers are running.
+    List<ContainerId> statList = new ArrayList<ContainerId>();
+    for (int i = 0; i < 2; i++) {
+      statList.add(createContainerId(i));
+    }
+    GetContainerStatusesRequest statRequest =
+        GetContainerStatusesRequest.newInstance(statList);
+    List<ContainerStatus> containerStatuses = containerManager
+        .getContainerStatuses(statRequest).getContainerStatuses();
+    for (ContainerStatus status : containerStatuses) {
+      Assert.assertEquals(
+          org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
+          status.getState());
+    }
+  }
+
+  /**
+   * Submit both a GUARANTEED and an OPPORTUNISTIC container, each of which
+   * requires more resources than available at the node, and make sure they
+   * are both queued.
+   * @throws Exception
+   */
+  @Test
+  public void testQueueMultipleContainers() throws Exception {
+    containerManager.start();
+
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    List<StartContainerRequest> list = new ArrayList<>();
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(3072, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.GUARANTEED)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(3072, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+
+    Thread.sleep(5000);
+
+    // Ensure both containers are queued.
+    List<ContainerId> statList = new ArrayList<ContainerId>();
+    for (int i = 0; i < 2; i++) {
+      statList.add(createContainerId(i));
+    }
+    GetContainerStatusesRequest statRequest =
+        GetContainerStatusesRequest.newInstance(statList);
+    List<ContainerStatus> containerStatuses = containerManager
+        .getContainerStatuses(statRequest).getContainerStatuses();
+    for (ContainerStatus status : containerStatuses) {
+      Assert.assertEquals(
+          org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
+          status.getState());
+    }
+
+    ContainerScheduler containerScheduler =
+        containerManager.getContainerScheduler();
+    // Ensure both containers are properly queued.
+    Assert.assertEquals(2, containerScheduler.getNumQueuedContainers());
+    Assert.assertEquals(1,
+        containerScheduler.getNumQueuedGuaranteedContainers());
+    Assert.assertEquals(1,
+        containerScheduler.getNumQueuedOpportunisticContainers());
+  }
+
+  /**
+   * Starts one OPPORTUNISTIC container that takes up the whole node's
+   * resources, and submit two more that will be queued.
+   * @throws Exception
+   */
+  @Test
+  public void testStartAndQueueMultipleContainers() throws Exception {
+    containerManager.start();
+
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    List<StartContainerRequest> list = new ArrayList<>();
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(2048, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(1024, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(1024, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+
+    Thread.sleep(5000);
+
+    // Ensure first container is running and others are queued.
+    List<ContainerId> statList = new ArrayList<ContainerId>();
+    for (int i = 0; i < 3; i++) {
+      statList.add(createContainerId(i));
+    }
+    GetContainerStatusesRequest statRequest = GetContainerStatusesRequest
+        .newInstance(Arrays.asList(createContainerId(0)));
+    List<ContainerStatus> containerStatuses = containerManager
+        .getContainerStatuses(statRequest).getContainerStatuses();
+    for (ContainerStatus status : containerStatuses) {
+      if (status.getContainerId().equals(createContainerId(0))) {
+        Assert.assertEquals(
+            org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
+            status.getState());
+      } else {
+        Assert.assertEquals(
+            org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
+            status.getState());
+      }
+    }
+
+    ContainerScheduler containerScheduler =
+        containerManager.getContainerScheduler();
+    // Ensure two containers are properly queued.
+    Assert.assertEquals(2, containerScheduler.getNumQueuedContainers());
+    Assert.assertEquals(0,
+        containerScheduler.getNumQueuedGuaranteedContainers());
+    Assert.assertEquals(2,
+        containerScheduler.getNumQueuedOpportunisticContainers());
+  }
+
+  /**
+   * Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources
+   * requests by each container as such that only one can run in parallel.
+   * Thus, the OPPORTUNISTIC container that started running, will be
+   * killed for the GUARANTEED container to start.
+   * Once the GUARANTEED container finishes its execution, the remaining
+   * OPPORTUNISTIC container will be executed.
+   * @throws Exception
+   */
+  @Test
+  public void testKillOpportunisticForGuaranteedContainer() throws Exception {
+    containerManager.start();
+
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    List<StartContainerRequest> list = new ArrayList<>();
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(2048, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(2048, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(2048, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.GUARANTEED)));
+
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+
+    BaseContainerManagerTest.waitForNMContainerState(containerManager,
+        createContainerId(0), ContainerState.DONE, 40);
+    Thread.sleep(5000);
+
+    // Get container statuses. Container 0 should be killed, container 1
+    // should be queued and container 2 should be running.
+    List<ContainerId> statList = new ArrayList<ContainerId>();
+    for (int i = 0; i < 3; i++) {
+      statList.add(createContainerId(i));
+    }
+    GetContainerStatusesRequest statRequest =
+        GetContainerStatusesRequest.newInstance(statList);
+    List<ContainerStatus> containerStatuses = containerManager
+        .getContainerStatuses(statRequest).getContainerStatuses();
+    for (ContainerStatus status : containerStatuses) {
+      if (status.getContainerId().equals(createContainerId(0))) {
+        Assert.assertTrue(status.getDiagnostics().contains(
+            "Container Killed to make room for Guaranteed Container"));
+      } else if (status.getContainerId().equals(createContainerId(1))) {
+        Assert.assertEquals(
+            org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
+            status.getState());
+      } else if (status.getContainerId().equals(createContainerId(2))) {
+        Assert.assertEquals(
+            org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
+            status.getState());
+      }
+      System.out.println("\nStatus : [" + status + "]\n");
+    }
+
+    // Make sure the remaining OPPORTUNISTIC container starts its execution.
+    BaseContainerManagerTest.waitForNMContainerState(containerManager,
+        createContainerId(2), ContainerState.DONE, 40);
+    Thread.sleep(5000);
+    statRequest = GetContainerStatusesRequest.newInstance(Arrays.asList(
+        createContainerId(1)));
+    ContainerStatus contStatus1 = containerManager.getContainerStatuses(
+        statRequest).getContainerStatuses().get(0);
+    Assert.assertEquals(
+        org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
+        contStatus1.getState());
+  }
+
+  /**
+   * 1. Submit a long running GUARANTEED container to hog all NM resources.
+   * 2. Submit 6 OPPORTUNISTIC containers, all of which will be queued.
+   * 3. Update the Queue Limit to 2.
+   * 4. Ensure only 2 containers remain in the Queue, and 4 are de-Queued.
+   * @throws Exception
+   */
+  @Test
+  public void testQueueShedding() throws Exception {
+    containerManager.start();
+
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+    containerLaunchContext.setCommands(Arrays.asList("sleep 100"));
+
+    List<StartContainerRequest> list = new ArrayList<>();
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(2048, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.GUARANTEED)));
+
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+
+    list = new ArrayList<>();
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(512, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(512, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(3), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(512, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(4), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(512, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(5), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(512, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(6), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(512, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+
+    allRequests = StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+
+    ContainerScheduler containerScheduler =
+        containerManager.getContainerScheduler();
+    // Ensure all containers are properly queued.
+    int numTries = 30;
+    while ((containerScheduler.getNumQueuedContainers() < 6) &&
+        (numTries-- > 0)) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(6, containerScheduler.getNumQueuedContainers());
+
+    ContainerQueuingLimit containerQueuingLimit = ContainerQueuingLimit
+        .newInstance();
+    containerQueuingLimit.setMaxQueueLength(2);
+    containerScheduler.updateQueuingLimit(containerQueuingLimit);
+    numTries = 30;
+    while ((containerScheduler.getNumQueuedContainers() > 2) &&
+        (numTries-- > 0)) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(2, containerScheduler.getNumQueuedContainers());
+
+    List<ContainerId> statList = new ArrayList<ContainerId>();
+    for (int i = 1; i < 7; i++) {
+      statList.add(createContainerId(i));
+    }
+    GetContainerStatusesRequest statRequest =
+        GetContainerStatusesRequest.newInstance(statList);
+    List<ContainerStatus> containerStatuses = containerManager
+        .getContainerStatuses(statRequest).getContainerStatuses();
+
+    int deQueuedContainers = 0;
+    int numQueuedOppContainers = 0;
+    for (ContainerStatus status : containerStatuses) {
+      if (status.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+        if (status.getDiagnostics().contains(
+            "Container De-queued to meet NM queuing limits")) {
+          deQueuedContainers++;
+        }
+        if (status.getState() ==
+            org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED) {
+          numQueuedOppContainers++;
+        }
+      }
+    }
+    Assert.assertEquals(4, deQueuedContainers);
+    Assert.assertEquals(2, numQueuedOppContainers);
+  }
+
+  /**
+   * 1. Submit a long running GUARANTEED container to hog all NM resources.
+   * 2. Submit 2 OPPORTUNISTIC containers, both of which will be queued.
+   * 3. Send Stop Container to one of the queued containers.
+   * 4. Ensure container is removed from the queue.
+   * @throws Exception
+   */
+  @Test
+  public void testContainerDeQueuedAfterAMKill() throws Exception {
+    containerManager.start();
+
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+    containerLaunchContext.setCommands(Arrays.asList("sleep 100"));
+
+    List<StartContainerRequest> list = new ArrayList<>();
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(2048, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.GUARANTEED)));
+
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+
+    list = new ArrayList<>();
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(512, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(512, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+
+    allRequests = StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+
+    ContainerScheduler containerScheduler =
+        containerManager.getContainerScheduler();
+    // Ensure both containers are properly queued.
+    int numTries = 30;
+    while ((containerScheduler.getNumQueuedContainers() < 2) &&
+        (numTries-- > 0)) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(2, containerScheduler.getNumQueuedContainers());
+
+    containerManager.stopContainers(
+        StopContainersRequest.newInstance(Arrays.asList(createContainerId(2))));
+
+    numTries = 30;
+    while ((containerScheduler.getNumQueuedContainers() > 1) &&
+        (numTries-- > 0)) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(1, containerScheduler.getNumQueuedContainers());
+  }
+
+  /**
+   * Submit three OPPORTUNISTIC containers that can run concurrently, and one
+   * GUARANTEED that needs to kill two of the OPPORTUNISTIC for it to run.
+   * @throws Exception
+   */
+  @Test
+  public void testKillMultipleOpportunisticContainers() throws Exception {
+    containerManager.start();
+
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    List<StartContainerRequest> list = new ArrayList<>();
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(512, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(512, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(512, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+
+    list = new ArrayList<>();
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(3), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(1500, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.GUARANTEED)));
+
+    allRequests = StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+
+    BaseContainerManagerTest.waitForNMContainerState(
+        containerManager, createContainerId(0),
+            Arrays.asList(ContainerState.DONE,
+                ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL), 40);
+    Thread.sleep(5000);
+
+    // Get container statuses. Container 0 should be killed, container 1
+    // should be queued and container 2 should be running.
+    int killedContainers = 0;
+    List<ContainerId> statList = new ArrayList<ContainerId>();
+    for (int i = 0; i < 4; i++) {
+      statList.add(createContainerId(i));
+    }
+    GetContainerStatusesRequest statRequest =
+        GetContainerStatusesRequest.newInstance(statList);
+    List<ContainerStatus> containerStatuses = containerManager
+        .getContainerStatuses(statRequest).getContainerStatuses();
+    for (ContainerStatus status : containerStatuses) {
+      if (status.getDiagnostics().contains(
+          "Container Killed to make room for Guaranteed Container")) {
+        killedContainers++;
+      }
+      System.out.println("\nStatus : [" + status + "]\n");
+    }
+
+    Assert.assertEquals(2, killedContainers);
+  }
+
+  /**
+   * Submit four OPPORTUNISTIC containers that can run concurrently, and then
+   * two GUARANTEED that needs to kill Exactly two of the OPPORTUNISTIC for
+   * it to run. Make sure only 2 are killed.
+   * @throws Exception
+   */
+  @Test
+  public void testKillOnlyRequiredOpportunisticContainers() throws Exception {
+    containerManager.start();
+
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    List<StartContainerRequest> list = new ArrayList<>();
+    // Fill NM with Opportunistic containers
+    for (int i = 0; i < 4; i++) {
+      list.add(StartContainerRequest.newInstance(
+          containerLaunchContext,
+          createContainerToken(createContainerId(i), DUMMY_RM_IDENTIFIER,
+              context.getNodeId(),
+              user, BuilderUtils.newResource(512, 1),
+              context.getContainerTokenSecretManager(), null,
+              ExecutionType.OPPORTUNISTIC)));
+    }
+
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+
+    list = new ArrayList<>();
+    // Now ask for two Guaranteed containers
+    for (int i = 4; i < 6; i++) {
+      list.add(StartContainerRequest.newInstance(
+          containerLaunchContext,
+          createContainerToken(createContainerId(i), DUMMY_RM_IDENTIFIER,
+              context.getNodeId(),
+              user, BuilderUtils.newResource(512, 1),
+              context.getContainerTokenSecretManager(), null,
+              ExecutionType.GUARANTEED)));
+    }
+
+    allRequests = StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+
+    BaseContainerManagerTest.waitForNMContainerState(containerManager,
+        createContainerId(0), ContainerState.DONE, 40);
+    Thread.sleep(5000);
+
+    // Get container statuses. Container 0 should be killed, container 1
+    // should be queued and container 2 should be running.
+    int killedContainers = 0;
+    List<ContainerId> statList = new ArrayList<ContainerId>();
+    for (int i = 0; i < 6; i++) {
+      statList.add(createContainerId(i));
+    }
+    GetContainerStatusesRequest statRequest =
+        GetContainerStatusesRequest.newInstance(statList);
+    List<ContainerStatus> containerStatuses = containerManager
+        .getContainerStatuses(statRequest).getContainerStatuses();
+    for (ContainerStatus status : containerStatuses) {
+      if (status.getDiagnostics().contains(
+          "Container Killed to make room for Guaranteed Container")) {
+        killedContainers++;
+      }
+      System.out.println("\nStatus : [" + status + "]\n");
+    }
+
+    Assert.assertEquals(2, killedContainers);
+  }
+
+  /**
+   * Start running one GUARANTEED container and queue two OPPORTUNISTIC ones.
+   * Try killing one of the two queued containers.
+   * @throws Exception
+   */
+  @Test
+  public void testStopQueuedContainer() throws Exception {
+    containerManager.start();
+
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    List<StartContainerRequest> list = new ArrayList<>();
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(2048, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.GUARANTEED)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(512, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(512, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+
+    Thread.sleep(2000);
+
+    // Assert there is initially one container running and two queued.
+    int runningContainersNo = 0;
+    int queuedContainersNo = 0;
+    List<ContainerId> statList = new ArrayList<ContainerId>();
+    for (int i = 0; i < 3; i++) {
+      statList.add(createContainerId(i));
+    }
+    GetContainerStatusesRequest statRequest = GetContainerStatusesRequest
+        .newInstance(statList);
+    List<ContainerStatus> containerStatuses = containerManager
+        .getContainerStatuses(statRequest).getContainerStatuses();
+    for (ContainerStatus status : containerStatuses) {
+      if (status.getState() ==
+          org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) {
+        runningContainersNo++;
+      } else if (status.getState() ==
+          org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED) {
+        queuedContainersNo++;
+      }
+      System.out.println("\nStatus : [" + status + "]\n");
+    }
+
+    Assert.assertEquals(1, runningContainersNo);
+    Assert.assertEquals(2, queuedContainersNo);
+
+    // Stop one of the two queued containers.
+    StopContainersRequest stopRequest = StopContainersRequest.
+        newInstance(Arrays.asList(createContainerId(1)));
+    containerManager.stopContainers(stopRequest);
+
+    Thread.sleep(2000);
+
+    // Assert queued container got properly stopped.
+    statList.clear();
+    for (int i = 0; i < 3; i++) {
+      statList.add(createContainerId(i));
+    }
+
+    statRequest = GetContainerStatusesRequest.newInstance(statList);
+    HashMap<org.apache.hadoop.yarn.api.records.ContainerState, ContainerStatus>
+        map = new HashMap<>();
+    for (int i=0; i < 10; i++) {
+      containerStatuses = containerManager.getContainerStatuses(statRequest)
+          .getContainerStatuses();
+      for (ContainerStatus status : containerStatuses) {
+        System.out.println("\nStatus : [" + status + "]\n");
+        map.put(status.getState(), status);
+        if (map.containsKey(
+                org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) &&
+            map.containsKey(
+                org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED) &&
+            map.containsKey(
+                org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE)) {
+          break;
+        }
+        Thread.sleep(1000);
+      }
+    }
+    Assert.assertEquals(createContainerId(0),
+        map.get(org.apache.hadoop.yarn.api.records.ContainerState.RUNNING)
+            .getContainerId());
+    Assert.assertEquals(createContainerId(1),
+        map.get(org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE)
+            .getContainerId());
+    Assert.assertEquals(createContainerId(2),
+        map.get(org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED)
+            .getContainerId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index 164488d..686a0d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -215,4 +215,19 @@ public class MockContainer implements Container {
   public void commitUpgrade() {
 
   }
+
+  @Override
+  public boolean isMarkedForKilling() {
+    return false;
+  }
+
+  @Override
+  public void sendLaunchEvent() {
+
+  }
+
+  @Override
+  public void sendKillEvent(int exitStatus, String description) {
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index 50a9c4d..d3db96f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -313,9 +313,11 @@ public class OpportunisticContainerAllocatorAMService
           appAttempt.getApplicationAttemptId(), container.getNodeId(),
           appAttempt.getUser(), rmContext, isRemotelyAllocated);
       appAttempt.addRMContainer(container.getId(), rmContainer);
+      ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(
+          container.getNodeId()).allocateContainer(rmContainer);
       rmContainer.handle(
           new RMContainerEvent(container.getId(),
-              RMContainerEventType.LAUNCHED));
+              RMContainerEventType.ACQUIRED));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index f5d8b5b..0cfa1d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -80,8 +80,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
         RMContainerEventType.KILL)
     .addTransition(RMContainerState.NEW, RMContainerState.RESERVED,
         RMContainerEventType.RESERVED, new ContainerReservedTransition())
-    .addTransition(RMContainerState.NEW, RMContainerState.RUNNING,
-        RMContainerEventType.LAUNCHED)
+    .addTransition(RMContainerState.NEW, RMContainerState.ACQUIRED,
+        RMContainerEventType.ACQUIRED, new AcquiredTransition())
     .addTransition(RMContainerState.NEW,
         EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED),
         RMContainerEventType.RECOVER, new ContainerRecoveredTransition())

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 302743a..ab13000 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -1394,32 +1394,26 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       }
 
       // Process running containers
-      if (remoteContainer.getState() == ContainerState.RUNNING) {
-        // Process only GUARANTEED containers in the RM.
-        if (remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) {
-          ++numRemoteRunningContainers;
-          if (!launchedContainers.contains(containerId)) {
-            // Just launched container. RM knows about it the first time.
-            launchedContainers.add(containerId);
-            newlyLaunchedContainers.add(remoteContainer);
-            // Unregister from containerAllocationExpirer.
-            containerAllocationExpirer
-                .unregister(new AllocationExpirationInfo(containerId));
-          }
-        }
-      } else {
-        if (remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) {
-          // A finished container
-          launchedContainers.remove(containerId);
+      if (remoteContainer.getState() == ContainerState.RUNNING ||
+          remoteContainer.getState() == ContainerState.SCHEDULED) {
+        ++numRemoteRunningContainers;
+        if (!launchedContainers.contains(containerId)) {
+          // Just launched container. RM knows about it the first time.
+          launchedContainers.add(containerId);
+          newlyLaunchedContainers.add(remoteContainer);
           // Unregister from containerAllocationExpirer.
           containerAllocationExpirer
               .unregister(new AllocationExpirationInfo(containerId));
         }
-        // Completed containers should also include the OPPORTUNISTIC containers
-        // so that the AM gets properly notified.
+      } else {
+        // A finished container
+        launchedContainers.remove(containerId);
         if (completedContainers.add(containerId)) {
           newlyCompletedContainers.add(remoteContainer);
         }
+        // Unregister from containerAllocationExpirer.
+        containerAllocationExpirer
+            .unregister(new AllocationExpirationInfo(containerId));
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message