hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From haiboc...@apache.org
Subject hadoop git commit: YARN-6706. Refactor ContainerScheduler to make oversubscription change easier. (Haibo Chen via asuresh)
Date Mon, 17 Jul 2017 21:26:19 GMT
Repository: hadoop
Updated Branches:
  refs/heads/YARN-1011 8e458246c -> 4c501b46d


YARN-6706. Refactor ContainerScheduler to make oversubscription change easier. (Haibo Chen
via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4c501b46
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4c501b46
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4c501b46

Branch: refs/heads/YARN-1011
Commit: 4c501b46dd3674bdd2471a431014bc6d12ffd703
Parents: 8e45824
Author: Arun Suresh <asuresh@apache.org>
Authored: Mon Jul 17 14:07:23 2017 -0700
Committer: Haibo Chen <haibochen@apache.org>
Committed: Mon Jul 17 14:25:51 2017 -0700

----------------------------------------------------------------------
 .../scheduler/ContainerScheduler.java           | 135 +++++++++++++------
 .../TestContainerManagerRecovery.java           |   2 +-
 .../TestContainerSchedulerQueuing.java          |  85 ++++++++++++
 3 files changed, 177 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c501b46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index 24530b3..19243ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -192,7 +192,9 @@ public class ContainerScheduler extends AbstractService implements
     // decrement only if it was a running container
     Container completedContainer = runningContainers.remove(container
         .getContainerId());
-    if (completedContainer != null) {
+    // only a running container releases resources upon completion
+    boolean resourceReleased = completedContainer != null;
+    if (resourceReleased) {
       this.utilizationTracker.subtractContainerResource(container);
       if (container.getContainerTokenIdentifier().getExecutionType() ==
           ExecutionType.OPPORTUNISTIC) {
@@ -218,8 +220,7 @@ public class ContainerScheduler extends AbstractService implements
     boolean resourcesAvailable = true;
     while (cIter.hasNext() && resourcesAvailable) {
       Container container = cIter.next();
-      if (this.utilizationTracker.hasResourcesAvailable(container)) {
-        startAllocatedContainer(container);
+      if (tryStartContainer(container)) {
         cIter.remove();
       } else {
         resourcesAvailable = false;
@@ -228,50 +229,95 @@ public class ContainerScheduler extends AbstractService implements
     return resourcesAvailable;
   }
 
-  @VisibleForTesting
-  protected void scheduleContainer(Container container) {
-    if (maxOppQueueLength <= 0) {
-      startAllocatedContainer(container);
-      return;
+  private boolean tryStartContainer(Container container) {
+    boolean containerStarted = false;
+    if (resourceAvailableToStartContainer(container)) {
+      startContainer(container);
+      containerStarted = true;
     }
-    if (queuedGuaranteedContainers.isEmpty() &&
-        queuedOpportunisticContainers.isEmpty() &&
-        this.utilizationTracker.hasResourcesAvailable(container)) {
-      startAllocatedContainer(container);
+    return containerStarted;
+  }
+
+  /**
+   * Check if there is resource available to start a given container
+   * immediately. (This can be extended to include overallocated resources)
+   * @param container the container to start
+   * @return true if container can be launched directly
+   */
+  private boolean resourceAvailableToStartContainer(Container container) {
+    return this.utilizationTracker.hasResourcesAvailable(container);
+  }
+
+  private boolean enqueueContainer(Container container) {
+    boolean isGuaranteedContainer = container.getContainerTokenIdentifier().
+        getExecutionType() == ExecutionType.GUARANTEED;
+
+    boolean isQueued;
+    if (isGuaranteedContainer) {
+      queuedGuaranteedContainers.put(container.getContainerId(), container);
+      isQueued = true;
     } else {
-      LOG.info("No available resources for container {} to start its execution "
-          + "immediately.", container.getContainerId());
-      boolean isQueued = true;
-      if (container.getContainerTokenIdentifier().getExecutionType() ==
-          ExecutionType.GUARANTEED) {
-        queuedGuaranteedContainers.put(container.getContainerId(), container);
-        // Kill running opportunistic containers to make space for
-        // guaranteed container.
-        killOpportunisticContainers(container);
+      if (queuedOpportunisticContainers.size() < maxOppQueueLength) {
+        LOG.info("Opportunistic container {} will be queued at the NM.",
+            container.getContainerId());
+        queuedOpportunisticContainers.put(
+            container.getContainerId(), container);
+        isQueued = true;
       } else {
-        if (queuedOpportunisticContainers.size() <= maxOppQueueLength) {
-          LOG.info("Opportunistic container {} will be queued at the NM.",
-              container.getContainerId());
-          queuedOpportunisticContainers.put(
-              container.getContainerId(), container);
-        } else {
-          isQueued = false;
-          LOG.info("Opportunistic container [{}] will not be queued at the NM" +
-              "since max queue length [{}] has been reached",
-              container.getContainerId(), maxOppQueueLength);
-          container.sendKillEvent(
-              ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
-              "Opportunistic container queue is full.");
-        }
+        LOG.info("Opportunistic container [{}] will not be queued at the NM" +
+                "since max queue length [{}] has been reached",
+            container.getContainerId(), maxOppQueueLength);
+        container.sendKillEvent(
+            ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
+            "Opportunistic container queue is full.");
+        isQueued = false;
       }
-      if (isQueued) {
-        try {
-          this.context.getNMStateStore().storeContainerQueued(
-              container.getContainerId());
-        } catch (IOException e) {
-          LOG.warn("Could not store container [" + container.getContainerId()
-              + "] state. The Container has been queued.", e);
-        }
+    }
+
+    if (isQueued) {
+      try {
+        this.context.getNMStateStore().storeContainerQueued(
+            container.getContainerId());
+      } catch (IOException e) {
+        LOG.warn("Could not store container [" + container.getContainerId()
+            + "] state. The Container has been queued.", e);
+      }
+    }
+
+    return isQueued;
+  }
+
+  @VisibleForTesting
+  protected void scheduleContainer(Container container) {
+    boolean isGuaranteedContainer = container.getContainerTokenIdentifier().
+        getExecutionType() == ExecutionType.GUARANTEED;
+
+    // Given a guaranteed container, we enqueue it first and then try to start
+    // as many queuing guaranteed containers as possible followed by queuing
+    // opportunistic containers based on remaining resources available. If the
+    // container still stays in the queue afterwards, we need to preempt just
+    // enough number of opportunistic containers.
+    if (isGuaranteedContainer) {
+      enqueueContainer(container);
+      startPendingContainers();
+
+      // if the guaranteed container is queued, we need to preempt opportunistic
+      // containers for make room for it
+      if (queuedGuaranteedContainers.containsKey(container.getContainerId())) {
+        killOpportunisticContainers(container);
+      }
+    } else {
+      // Given an opportunistic container, we first try to start as many queuing
+      // guaranteed containers as possible followed by queuing opportunistic
+      // containers based on remaining resource available, then enqueue the
+      // opportunistic container. If the container is enqueued, we do another
+      // pass to try to start the newly enqueued opportunistic container.
+      startPendingContainers();
+      boolean containerQueued = enqueueContainer(container);
+      // container may not get queued because the max opportunistic container
+      // queue length is reached. If so, there is no point doing another pass
+      if (containerQueued) {
+        startPendingContainers();
       }
     }
   }
@@ -292,7 +338,7 @@ public class ContainerScheduler extends AbstractService implements
     }
   }
 
-  private void startAllocatedContainer(Container container) {
+  private void startContainer(Container container) {
     LOG.info("Starting container [" + container.getContainerId()+ "]");
     runningContainers.put(container.getContainerId(), container);
     this.utilizationTracker.addContainerResources(container);
@@ -416,4 +462,5 @@ public class ContainerScheduler extends AbstractService implements
   public ContainersMonitor getContainersMonitor() {
     return this.context.getContainerManager().getContainersMonitor();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c501b46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index 075d857..b1a7b4b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -583,7 +583,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest
{
 
               @Override
               public long getPmemAllocatedForContainers() {
-                return 10240;
+                return (long) 2048 << 20;
               }
 
               @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c501b46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
index 8264f2e..aeba399 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
@@ -332,6 +332,91 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest
{
   }
 
   /**
+   * Starts one GUARANTEED container that takes us the whole node's resources.
+   * and submit more OPPORTUNISTIC containers than the opportunistic container
+   * queue can hold. OPPORTUNISTIC containers that cannot be queue should be
+   * killed.
+   * @throws Exception
+   */
+  @Test
+  public void testStartOpportunistcsWhenOppQueueIsFull() throws Exception {
+    containerManager.start();
+
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    List<StartContainerRequest> list = new ArrayList<>();
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(2048, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.GUARANTEED)));
+
+    final int maxOppQueueLength = conf.getInt(
+        YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
+        YarnConfiguration.DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH);
+    for (int i = 1; i < maxOppQueueLength + 2; i++) {
+      list.add(StartContainerRequest.newInstance(
+          containerLaunchContext,
+          createContainerToken(createContainerId(i), DUMMY_RM_IDENTIFIER,
+              context.getNodeId(),
+              user, BuilderUtils.newResource(2048, 1),
+              context.getContainerTokenSecretManager(), null,
+              ExecutionType.OPPORTUNISTIC)));
+    }
+
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+
+    BaseContainerManagerTest.waitForNMContainerState(containerManager,
+        createContainerId(0), ContainerState.RUNNING, 40);
+    BaseContainerManagerTest.waitForNMContainerState(containerManager,
+        createContainerId(maxOppQueueLength + 1), ContainerState.DONE,
+        40);
+    Thread.sleep(5000);
+
+    // Get container statuses. Container 0 should be running and container
+    // 1 to maxOppQueueLength should be queued and the last container should
+    // be killed
+    List<ContainerId> statList = new ArrayList<>();
+    for (int i = 0; i < maxOppQueueLength + 2; i++) {
+      statList.add(createContainerId(i));
+    }
+    GetContainerStatusesRequest statRequest =
+        GetContainerStatusesRequest.newInstance(statList);
+    List<ContainerStatus> containerStatuses = containerManager
+        .getContainerStatuses(statRequest).getContainerStatuses();
+    for (ContainerStatus status : containerStatuses) {
+      if (status.getContainerId().equals(createContainerId(0))) {
+        Assert.assertEquals(
+            org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
+            status.getState());
+      } else if (status.getContainerId().equals(createContainerId(
+          maxOppQueueLength + 1))) {
+        Assert.assertTrue(status.getDiagnostics().contains(
+            "Opportunistic container queue is full"));
+      } else {
+        Assert.assertEquals(
+            org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
+            status.getState());
+      }
+      System.out.println("\nStatus : [" + status + "]\n");
+    }
+
+    ContainerScheduler containerScheduler =
+        containerManager.getContainerScheduler();
+    Assert.assertEquals(maxOppQueueLength,
+        containerScheduler.getNumQueuedContainers());
+    Assert.assertEquals(0,
+        containerScheduler.getNumQueuedGuaranteedContainers());
+    Assert.assertEquals(maxOppQueueLength,
+        containerScheduler.getNumQueuedOpportunisticContainers());
+  }
+
+  /**
    * Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources
    * requests by each container as such that only one can run in parallel.
    * Thus, the OPPORTUNISTIC container that started running, will be


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message