hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject [19/50] [abbrv] hadoop git commit: YARN-2883. Queuing of container requests in the NM. (Konstantinos Karanasos and Arun Suresh via kasha)
Date Fri, 22 Apr 2016 18:58:42 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8172f5f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 3f5fc82..702198e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
@@ -176,7 +178,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
     // Just do a query for a non-existing container.
     boolean throwsException = false;
     try {
-      List<ContainerId> containerIds = new ArrayList<ContainerId>();
+      List<ContainerId> containerIds = new ArrayList<>();
       ContainerId id =createContainerId(0);
       containerIds.add(id);
       GetContainerStatusesRequest request =
@@ -231,14 +233,14 @@ public class TestContainerManager extends BaseContainerManagerTest {
           containerLaunchContext,
           createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
             user, context.getContainerTokenSecretManager()));
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list = new ArrayList<>();
     list.add(scRequest);
     StartContainersRequest allRequests =
         StartContainersRequest.newInstance(list);
     containerManager.startContainers(allRequests);
 
     BaseContainerManagerTest.waitForContainerState(containerManager, cId,
-        ContainerState.COMPLETE);
+        ContainerState.COMPLETE, 40);
 
     // Now ascertain that the resources are localised correctly.
     ApplicationId appId = cId.getApplicationAttemptId().getApplicationId();
@@ -323,7 +325,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
           createContainerToken(cId,
             DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
             context.getContainerTokenSecretManager()));
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list = new ArrayList<>();
     list.add(scRequest);
     StartContainersRequest allRequests =
         StartContainersRequest.newInstance(list);
@@ -355,7 +357,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
     Assert.assertTrue("Process is not alive!",
       DefaultContainerExecutor.containerIsAlive(pid));
 
-    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    List<ContainerId> containerIds = new ArrayList<>();
     containerIds.add(cId);
     StopContainersRequest stopRequest =
         StopContainersRequest.newInstance(containerIds);
@@ -375,7 +377,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
       DefaultContainerExecutor.containerIsAlive(pid));
   }
 
-  private void testContainerLaunchAndExit(int exitCode) throws IOException,
+  protected void testContainerLaunchAndExit(int exitCode) throws IOException,
       InterruptedException, YarnException {
 
 	  File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
@@ -430,7 +432,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
           containerLaunchContext,
           createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
             user, context.getContainerTokenSecretManager()));
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list = new ArrayList<>();
     list.add(scRequest);
     StartContainersRequest allRequests =
         StartContainersRequest.newInstance(list);
@@ -439,12 +441,12 @@ public class TestContainerManager extends BaseContainerManagerTest {
 	  BaseContainerManagerTest.waitForContainerState(containerManager, cId,
 			  ContainerState.COMPLETE);
 
-    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    List<ContainerId> containerIds = new ArrayList<>();
     containerIds.add(cId);
     GetContainerStatusesRequest gcsRequest =
         GetContainerStatusesRequest.newInstance(containerIds);
-	  ContainerStatus containerStatus = 
-			  containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
+    ContainerStatus containerStatus = containerManager.
+        getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
 
 	  // Verify exit status matches exit state of script
 	  Assert.assertEquals(exitCode,
@@ -520,7 +522,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
           containerLaunchContext,
           createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
             user, context.getContainerTokenSecretManager()));
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list = new ArrayList<>();
     list.add(scRequest);
     StartContainersRequest allRequests =
         StartContainersRequest.newInstance(list);
@@ -605,7 +607,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
           createContainerToken(cId1,
             ResourceManagerConstants.RM_INVALID_IDENTIFIER, context.getNodeId(),
             user, context.getContainerTokenSecretManager()));
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list = new ArrayList<>();
     list.add(startRequest1);
     StartContainersRequest allRequests =
         StartContainersRequest.newInstance(list);
@@ -635,7 +637,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
           createContainerToken(cId2,
             DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
             context.getContainerTokenSecretManager()));
-    List<StartContainerRequest> list2 = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list2 = new ArrayList<>();
     list.add(startRequest2);
     StartContainersRequest allRequests2 =
         StartContainersRequest.newInstance(list2);
@@ -655,7 +657,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
   public void testMultipleContainersLaunch() throws Exception {
     containerManager.start();
 
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list = new ArrayList<>();
     ContainerLaunchContext containerLaunchContext =
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
     for (int i = 0; i < 10; i++) {
@@ -679,6 +681,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
 
     StartContainersResponse response =
         containerManager.startContainers(requestList);
+    Thread.sleep(5000);
 
     Assert.assertEquals(5, response.getSuccessfullyStartedContainers().size());
     for (ContainerId id : response.getSuccessfullyStartedContainers()) {
@@ -699,12 +702,11 @@ public class TestContainerManager extends BaseContainerManagerTest {
   @Test
   public void testMultipleContainersStopAndGetStatus() throws Exception {
     containerManager.start();
-    List<StartContainerRequest> startRequest =
-        new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> startRequest = new ArrayList<>();
     ContainerLaunchContext containerLaunchContext =
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
 
-    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    List<ContainerId> containerIds = new ArrayList<>();
     for (int i = 0; i < 10; i++) {
       ContainerId cId = createContainerId(i);
       String user = null;
@@ -727,6 +729,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
     StartContainersRequest requestList =
         StartContainersRequest.newInstance(startRequest);
     containerManager.startContainers(requestList);
+    Thread.sleep(5000);
 
     // Get container statuses
     GetContainerStatusesRequest statusRequest =
@@ -777,8 +780,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
         ServiceA.class, Service.class);
     containerManager.start();
 
-    List<StartContainerRequest> startRequest =
-        new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> startRequest = new ArrayList<>();
 
     ContainerLaunchContext containerLaunchContext =
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
@@ -803,8 +805,8 @@ public class TestContainerManager extends BaseContainerManagerTest {
 
     StartContainersResponse response =
         containerManager.startContainers(requestList);
-    Assert.assertTrue(response.getFailedRequests().size() == 1);
-    Assert.assertTrue(response.getSuccessfullyStartedContainers().size() == 0);
+    Assert.assertEquals(1, response.getFailedRequests().size());
+    Assert.assertEquals(0, response.getSuccessfullyStartedContainers().size());
     Assert.assertTrue(response.getFailedRequests().containsKey(cId));
     Assert.assertTrue(response.getFailedRequests().get(cId).getMessage()
         .contains("The auxService:" + serviceName + " does not exist"));
@@ -880,8 +882,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
         ContainerManagerImpl.INVALID_NMTOKEN_MSG);
 
     Mockito.doNothing().when(spyContainerMgr).authorizeUser(ugInfo, null);
-    List<StartContainerRequest> reqList
-        = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> reqList = new ArrayList<>();
     reqList.add(StartContainerRequest.newInstance(null, null));
     StartContainersRequest reqs = new StartContainersRequestPBImpl();
     reqs.setStartContainerRequests(reqList);
@@ -925,7 +926,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
 
     Thread.sleep(2000);
     // Construct container resource increase request,
-    List<Token> increaseTokens = new ArrayList<Token>();
+    List<Token> increaseTokens = new ArrayList<>();
     // Add increase request for container-0, the request will fail as the
     // container will have exited, and won't be in RUNNING state
     ContainerId cId0 = createContainerId(0);
@@ -1012,7 +1013,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
             containerLaunchContext,
             createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
             user, context.getContainerTokenSecretManager()));
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list = new ArrayList<>();
     list.add(scRequest);
     StartContainersRequest allRequests =
         StartContainersRequest.newInstance(list);
@@ -1022,7 +1023,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
         org.apache.hadoop.yarn.server.nodemanager.
             containermanager.container.ContainerState.RUNNING);
     // Construct container resource increase request,
-    List<Token> increaseTokens = new ArrayList<Token>();
+    List<Token> increaseTokens = new ArrayList<>();
     // Add increase request. The increase request should fail
     // as the current resource does not fit in the target resource
     Token containerToken =
@@ -1096,7 +1097,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
                 createContainerToken(cId, DUMMY_RM_IDENTIFIER,
                     context.getNodeId(), user,
                         context.getContainerTokenSecretManager()));
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list = new ArrayList<>();
     list.add(scRequest);
     StartContainersRequest allRequests =
         StartContainersRequest.newInstance(list);
@@ -1106,7 +1107,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
         org.apache.hadoop.yarn.server.nodemanager.
             containermanager.container.ContainerState.RUNNING);
     // Construct container resource increase request,
-    List<Token> increaseTokens = new ArrayList<Token>();
+    List<Token> increaseTokens = new ArrayList<>();
     // Add increase request.
     Resource targetResource = Resource.newInstance(4096, 2);
     Token containerToken = createContainerToken(cId, DUMMY_RM_IDENTIFIER,
@@ -1184,6 +1185,21 @@ public class TestContainerManager extends BaseContainerManagerTest
{
             containerTokenIdentifier);
   }
 
+  public static Token createContainerToken(ContainerId cId, long rmIdentifier,
+      NodeId nodeId, String user, Resource resource,
+      NMContainerTokenSecretManager containerTokenSecretManager,
+      LogAggregationContext logAggregationContext, ExecutionType executionType)
+      throws IOException {
+    ContainerTokenIdentifier containerTokenIdentifier =
+        new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
+            System.currentTimeMillis() + 100000L, 123, rmIdentifier,
+            Priority.newInstance(0), 0, logAggregationContext, null,
+            ContainerType.TASK, executionType);
+    return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
+            .retrievePassword(containerTokenIdentifier),
+        containerTokenIdentifier);
+  }
+
   @Test
   public void testOutputThreadDumpSignal() throws IOException,
       InterruptedException, YarnException {
@@ -1241,7 +1257,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
         new HashMap<String, LocalResource>();
     localResources.put(destinationFile, rsrc_alpha);
     containerLaunchContext.setLocalResources(localResources);
-    List<String> commands = new ArrayList<String>();
+    List<String> commands = new ArrayList<>();
     commands.add("/bin/bash");
     commands.add(scriptFile.getAbsolutePath());
     containerLaunchContext.setCommands(commands);
@@ -1250,7 +1266,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
             containerLaunchContext,
             createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
             user, context.getContainerTokenSecretManager()));
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list = new ArrayList<>();
     list.add(scRequest);
     StartContainersRequest allRequests =
         StartContainersRequest.newInstance(list);
@@ -1267,7 +1283,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
     // Simulate NodeStatusUpdaterImpl sending CMgrSignalContainersEvent
     SignalContainerRequest signalReq =
         SignalContainerRequest.newInstance(cId, command);
-    List<SignalContainerRequest> reqs = new ArrayList<SignalContainerRequest>();
+    List<SignalContainerRequest> reqs = new ArrayList<>();
     reqs.add(signalReq);
     containerManager.handle(new CMgrSignalContainersEvent(reqs));
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8172f5f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java
index bbde9ed..0dc5c5b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java
@@ -22,6 +22,10 @@ import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
 
 public class MockResourceCalculatorPlugin extends ResourceCalculatorPlugin {
 
+  public MockResourceCalculatorPlugin() {
+    super(null);
+  }
+
   @Override
   public long getVirtualMemorySize() {
     return 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8172f5f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
index d7f89fc..1a0c690 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -173,8 +174,8 @@ public class TestContainersMonitorResourceChange {
     assertTrue(containerEventHandler
         .isContainerKilled(getContainerId(1)));
     // create container 2
-    containersMonitor.handle(new ContainerStartMonitoringEvent(
-        getContainerId(2), 2202009L, 1048576L, 1, 0, 0));
+    containersMonitor.handle(new ContainerStartMonitoringEvent(getContainerId(
+        2), 2202009L, 1048576L, 1, 0, 0));
     // verify that this container is properly tracked
     assertNotNull(getProcessTreeInfo(getContainerId(2)));
     assertEquals(1048576L, getProcessTreeInfo(getContainerId(2))
@@ -215,8 +216,8 @@ public class TestContainersMonitorResourceChange {
     // now waiting for the next monitor cycle
     Thread.sleep(1000);
     // create a container with id 3
-    containersMonitor.handle(new ContainerStartMonitoringEvent(
-        getContainerId(3), 2202009L, 1048576L, 1, 0, 0));
+    containersMonitor.handle(new ContainerStartMonitoringEvent(getContainerId(
+        3), 2202009L, 1048576L, 1, 0, 0));
     // Verify that this container has been tracked
     assertNotNull(getProcessTreeInfo(getContainerId(3)));
     // trigger a change resource event, check limit after change

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8172f5f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java
new file mode 100644
index 0000000..0d951f4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
+    .ContainersMonitorImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorPlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorProcessTree;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestQueuingContainerManager extends TestContainerManager {
+
+  interface HasResources {
+    boolean decide(Context context, ContainerId cId);
+  }
+
+  public TestQueuingContainerManager() throws UnsupportedFileSystemException {
+    super();
+  }
+
+  static {
+    LOG = LogFactory.getLog(TestQueuingContainerManager.class);
+  }
+
+  HasResources hasResources = null;
+  boolean shouldDeleteWait = false;
+
+  @Override
+  protected ContainerManagerImpl
+  createContainerManager(DeletionService delSrvc) {
+    return new QueuingContainerManagerImpl(context, exec, delSrvc,
+        nodeStatusUpdater, metrics, dirsHandler) {
+
+      @Override
+      public void serviceInit(Configuration conf) throws Exception {
+        conf.set(
+            YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
+            MockResourceCalculatorPlugin.class.getCanonicalName());
+        conf.set(
+            YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
+            MockResourceCalculatorProcessTree.class.getCanonicalName());
+        super.serviceInit(conf);
+      }
+
+      @Override
+      public void
+      setBlockNewContainerRequests(boolean blockNewContainerRequests) {
+        // do nothing
+      }
+
+      @Override
+      protected UserGroupInformation getRemoteUgi() throws YarnException {
+        ApplicationId appId = ApplicationId.newInstance(0, 0);
+        ApplicationAttemptId appAttemptId =
+            ApplicationAttemptId.newInstance(appId, 1);
+        UserGroupInformation ugi =
+            UserGroupInformation.createRemoteUser(appAttemptId.toString());
+        ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
+            .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
+            .getKeyId()));
+        return ugi;
+      }
+
+      @Override
+      protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
+          Container container, boolean stopRequest, NMTokenIdentifier identifier) throws
YarnException {
+        if(container == null || container.getUser().equals("Fail")){
+          throw new YarnException("Reject this container");
+        }
+      }
+
+      @Override
+      protected ContainersMonitor createContainersMonitor(ContainerExecutor
+          exec) {
+        return new ContainersMonitorImpl(exec, dispatcher, this.context) {
+          @Override
+          public boolean hasResourcesAvailable(
+              ContainersMonitorImpl.ProcessTreeInfo pti) {
+            return hasResources.decide(this.context, pti.getContainerId());
+          }
+        };
+      }
+    };
+  }
+
+  @Override
+  protected DeletionService createDeletionService() {
+    return new DeletionService(exec) {
+      @Override
+      public void delete(String user, Path subDir, Path... baseDirs) {
+        // Don't do any deletions.
+        if (shouldDeleteWait) {
+          try {
+            Thread.sleep(10000);
+            LOG.info("\n\nSleeping Pseudo delete : user - " + user + ", " +
+                "subDir - " + subDir + ", " +
+                "baseDirs - " + Arrays.asList(baseDirs));
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        } else {
+          LOG.info("\n\nPseudo delete : user - " + user + ", " +
+              "subDir - " + subDir + ", " +
+              "baseDirs - " + Arrays.asList(baseDirs));
+        }
+      }
+    };
+  }
+
+  @Override
+  public void setup() throws IOException {
+    super.setup();
+    shouldDeleteWait = false;
+    hasResources = new HasResources() {
+      @Override
+      public boolean decide(Context context, ContainerId cId) {
+        return true;
+      }
+    };
+  }
+
+  /**
+   * Test to verify that an OPPORTUNISTIC container is killed when
+   * a GUARANTEED container arrives and all the Node Resources are used up
+   *
+   * For this specific test case, 4 containers are requested (last one being
+   * guaranteed). Assumptions :
+   * 1) The first OPPORTUNISTIC Container will start running
+   * 2) The second and third OPP containers will be queued
+   * 3) When the GUARANTEED container comes in, the running OPP container
+   *    will be killed to make room
+   * 4) After the GUARANTEED container finishes, the remaining 2 OPP
+   *    containers will be dequeued and run.
+   * 5) Only the first OPP container will be killed.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testSimpleOpportunisticContainer() throws Exception {
+    shouldDeleteWait = true;
+    containerManager.start();
+
+    // ////// Create the resources for the container
+    File dir = new File(tmpDir, "dir");
+    dir.mkdirs();
+    File file = new File(dir, "file");
+    PrintWriter fileWriter = new PrintWriter(file);
+    fileWriter.write("Hello World!");
+    fileWriter.close();
+
+    // ////// Construct the container-spec.
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+    URL resource_alpha =
+        ConverterUtils.getYarnUrlFromPath(localFS
+            .makeQualified(new Path(file.getAbsolutePath())));
+    LocalResource rsrc_alpha =
+        recordFactory.newRecordInstance(LocalResource.class);
+    rsrc_alpha.setResource(resource_alpha);
+    rsrc_alpha.setSize(-1);
+    rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+    rsrc_alpha.setType(LocalResourceType.FILE);
+    rsrc_alpha.setTimestamp(file.lastModified());
+    String destinationFile = "dest_file";
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+    localResources.put(destinationFile, rsrc_alpha);
+    containerLaunchContext.setLocalResources(localResources);
+
+    // Start 3 OPPORTUNISTIC containers and 1 GUARANTEED container
+    List<StartContainerRequest> list = new ArrayList<>();
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(1024, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(1024, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(1024, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    // GUARANTEED
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(3), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, context.getContainerTokenSecretManager())));
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+
+    // Plugin to simulate that the Node is full
+    // It only allows 1 container to run at a time.
+    hasResources = new HasResources() {
+      @Override
+      public boolean decide(Context context, ContainerId cId) {
+        int nOpp = ((QueuingContainerManagerImpl) containerManager)
+            .getNumAllocatedOpportunisticContainers();
+        int nGuar = ((QueuingContainerManagerImpl) containerManager)
+            .getNumAllocatedGuaranteedContainers();
+        boolean val = (nOpp + nGuar < 1);
+        System.out.println("\nHasResources : [" + cId + "]," +
+            "Opp[" + nOpp + "], Guar[" + nGuar + "], [" + val + "]\n");
+        return val;
+      }
+    };
+
+    containerManager.startContainers(allRequests);
+
+    BaseContainerManagerTest.waitForContainerState(containerManager,
+        createContainerId(3),
+        ContainerState.COMPLETE, 40);
+    List<ContainerId> statList = new ArrayList<ContainerId>();
+    for (int i = 0; i < 4; i++) {
+      statList.add(createContainerId(i));
+    }
+    GetContainerStatusesRequest statRequest =
+        GetContainerStatusesRequest.newInstance(statList);
+    List<ContainerStatus> containerStatuses = containerManager
+        .getContainerStatuses(statRequest).getContainerStatuses();
+    for (ContainerStatus status : containerStatuses) {
+      // Ensure that the first opportunistic container is killed
+      if (status.getContainerId().equals(createContainerId(0))) {
+        Assert.assertTrue(status.getDiagnostics()
+            .contains("Container killed by the ApplicationMaster"));
+      }
+      System.out.println("\nStatus : [" + status + "]\n");
+    }
+  }
+}


Mime
View raw message