Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 274E6200CD9 for ; Thu, 20 Jul 2017 00:35:23 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 25DDF168898; Wed, 19 Jul 2017 22:35:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 20CEC16887B for ; Thu, 20 Jul 2017 00:35:21 +0200 (CEST) Received: (qmail 48425 invoked by uid 500); 19 Jul 2017 22:34:56 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 46327 invoked by uid 99); 19 Jul 2017 22:34:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Jul 2017 22:34:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D289DF5549; Wed, 19 Jul 2017 22:34:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: xyao@apache.org To: common-commits@hadoop.apache.org Date: Wed, 19 Jul 2017 22:35:21 -0000 Message-Id: <239738a68ffa4969b60fcda47dec6d8a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [29/50] [abbrv] hadoop git commit: YARN-6706. Refactor ContainerScheduler to make oversubscription change easier. (Haibo Chen via asuresh) archived-at: Wed, 19 Jul 2017 22:35:23 -0000 YARN-6706. Refactor ContainerScheduler to make oversubscription change easier. (Haibo Chen via asuresh) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5b007921 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5b007921 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5b007921 Branch: refs/heads/HDFS-7240 Commit: 5b007921cdf01ecc8ed97c164b7d327b8304c529 Parents: ed27f2b Author: Arun Suresh Authored: Mon Jul 17 14:07:23 2017 -0700 Committer: Arun Suresh Committed: Mon Jul 17 14:11:14 2017 -0700 ---------------------------------------------------------------------- .../scheduler/ContainerScheduler.java | 135 +++++++++++++------ .../TestContainerManagerRecovery.java | 2 +- .../TestContainerSchedulerQueuing.java | 85 ++++++++++++ 3 files changed, 177 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b007921/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 24530b3..19243ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -192,7 +192,9 @@ public class ContainerScheduler extends AbstractService implements // decrement only if it was a running container Container completedContainer = runningContainers.remove(container .getContainerId()); - if (completedContainer != null) { + // only a running container releases resources upon completion + boolean resourceReleased = completedContainer != null; + if (resourceReleased) { this.utilizationTracker.subtractContainerResource(container); if (container.getContainerTokenIdentifier().getExecutionType() == ExecutionType.OPPORTUNISTIC) { @@ -218,8 +220,7 @@ public class ContainerScheduler extends AbstractService implements boolean resourcesAvailable = true; while (cIter.hasNext() && resourcesAvailable) { Container container = cIter.next(); - if (this.utilizationTracker.hasResourcesAvailable(container)) { - startAllocatedContainer(container); + if (tryStartContainer(container)) { cIter.remove(); } else { resourcesAvailable = false; @@ -228,50 +229,95 @@ public class ContainerScheduler extends AbstractService implements return resourcesAvailable; } - @VisibleForTesting - protected void scheduleContainer(Container container) { - if (maxOppQueueLength <= 0) { - startAllocatedContainer(container); - return; + private boolean tryStartContainer(Container container) { + boolean containerStarted = false; + if (resourceAvailableToStartContainer(container)) { + startContainer(container); + containerStarted = true; } - if (queuedGuaranteedContainers.isEmpty() && - queuedOpportunisticContainers.isEmpty() && - this.utilizationTracker.hasResourcesAvailable(container)) { - startAllocatedContainer(container); + return containerStarted; + } + + /** + * Check if there is resource available to start a given container + * immediately. (This can be extended to include overallocated resources) + * @param container the container to start + * @return true if container can be launched directly + */ + private boolean resourceAvailableToStartContainer(Container container) { + return this.utilizationTracker.hasResourcesAvailable(container); + } + + private boolean enqueueContainer(Container container) { + boolean isGuaranteedContainer = container.getContainerTokenIdentifier(). + getExecutionType() == ExecutionType.GUARANTEED; + + boolean isQueued; + if (isGuaranteedContainer) { + queuedGuaranteedContainers.put(container.getContainerId(), container); + isQueued = true; } else { - LOG.info("No available resources for container {} to start its execution " - + "immediately.", container.getContainerId()); - boolean isQueued = true; - if (container.getContainerTokenIdentifier().getExecutionType() == - ExecutionType.GUARANTEED) { - queuedGuaranteedContainers.put(container.getContainerId(), container); - // Kill running opportunistic containers to make space for - // guaranteed container. - killOpportunisticContainers(container); + if (queuedOpportunisticContainers.size() < maxOppQueueLength) { + LOG.info("Opportunistic container {} will be queued at the NM.", + container.getContainerId()); + queuedOpportunisticContainers.put( + container.getContainerId(), container); + isQueued = true; } else { - if (queuedOpportunisticContainers.size() <= maxOppQueueLength) { - LOG.info("Opportunistic container {} will be queued at the NM.", - container.getContainerId()); - queuedOpportunisticContainers.put( - container.getContainerId(), container); - } else { - isQueued = false; - LOG.info("Opportunistic container [{}] will not be queued at the NM" + - "since max queue length [{}] has been reached", - container.getContainerId(), maxOppQueueLength); - container.sendKillEvent( - ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER, - "Opportunistic container queue is full."); - } + LOG.info("Opportunistic container [{}] will not be queued at the NM" + + "since max queue length [{}] has been reached", + container.getContainerId(), maxOppQueueLength); + container.sendKillEvent( + ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER, + "Opportunistic container queue is full."); + isQueued = false; } - if (isQueued) { - try { - this.context.getNMStateStore().storeContainerQueued( - container.getContainerId()); - } catch (IOException e) { - LOG.warn("Could not store container [" + container.getContainerId() - + "] state. The Container has been queued.", e); - } + } + + if (isQueued) { + try { + this.context.getNMStateStore().storeContainerQueued( + container.getContainerId()); + } catch (IOException e) { + LOG.warn("Could not store container [" + container.getContainerId() + + "] state. The Container has been queued.", e); + } + } + + return isQueued; + } + + @VisibleForTesting + protected void scheduleContainer(Container container) { + boolean isGuaranteedContainer = container.getContainerTokenIdentifier(). + getExecutionType() == ExecutionType.GUARANTEED; + + // Given a guaranteed container, we enqueue it first and then try to start + // as many queuing guaranteed containers as possible followed by queuing + // opportunistic containers based on remaining resources available. If the + // container still stays in the queue afterwards, we need to preempt just + // enough number of opportunistic containers. + if (isGuaranteedContainer) { + enqueueContainer(container); + startPendingContainers(); + + // if the guaranteed container is queued, we need to preempt opportunistic + // containers for make room for it + if (queuedGuaranteedContainers.containsKey(container.getContainerId())) { + killOpportunisticContainers(container); + } + } else { + // Given an opportunistic container, we first try to start as many queuing + // guaranteed containers as possible followed by queuing opportunistic + // containers based on remaining resource available, then enqueue the + // opportunistic container. If the container is enqueued, we do another + // pass to try to start the newly enqueued opportunistic container. + startPendingContainers(); + boolean containerQueued = enqueueContainer(container); + // container may not get queued because the max opportunistic container + // queue length is reached. If so, there is no point doing another pass + if (containerQueued) { + startPendingContainers(); } } } @@ -292,7 +338,7 @@ public class ContainerScheduler extends AbstractService implements } } - private void startAllocatedContainer(Container container) { + private void startContainer(Container container) { LOG.info("Starting container [" + container.getContainerId()+ "]"); runningContainers.put(container.getContainerId(), container); this.utilizationTracker.addContainerResources(container); @@ -416,4 +462,5 @@ public class ContainerScheduler extends AbstractService implements public ContainersMonitor getContainersMonitor() { return this.context.getContainerManager().getContainersMonitor(); } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b007921/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 075d857..b1a7b4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -583,7 +583,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { @Override public long getPmemAllocatedForContainers() { - return 10240; + return (long) 2048 << 20; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b007921/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java index 8264f2e..aeba399 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java @@ -332,6 +332,91 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest { } /** + * Starts one GUARANTEED container that takes us the whole node's resources. + * and submit more OPPORTUNISTIC containers than the opportunistic container + * queue can hold. OPPORTUNISTIC containers that cannot be queue should be + * killed. + * @throws Exception + */ + @Test + public void testStartOpportunistcsWhenOppQueueIsFull() throws Exception { + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + List list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED))); + + final int maxOppQueueLength = conf.getInt( + YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, + YarnConfiguration.DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH); + for (int i = 1; i < maxOppQueueLength + 2; i++) { + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(i), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + } + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + BaseContainerManagerTest.waitForNMContainerState(containerManager, + createContainerId(0), ContainerState.RUNNING, 40); + BaseContainerManagerTest.waitForNMContainerState(containerManager, + createContainerId(maxOppQueueLength + 1), ContainerState.DONE, + 40); + Thread.sleep(5000); + + // Get container statuses. Container 0 should be running and container + // 1 to maxOppQueueLength should be queued and the last container should + // be killed + List statList = new ArrayList<>(); + for (int i = 0; i < maxOppQueueLength + 2; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + if (status.getContainerId().equals(createContainerId(0))) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + } else if (status.getContainerId().equals(createContainerId( + maxOppQueueLength + 1))) { + Assert.assertTrue(status.getDiagnostics().contains( + "Opportunistic container queue is full")); + } else { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED, + status.getState()); + } + System.out.println("\nStatus : [" + status + "]\n"); + } + + ContainerScheduler containerScheduler = + containerManager.getContainerScheduler(); + Assert.assertEquals(maxOppQueueLength, + containerScheduler.getNumQueuedContainers()); + Assert.assertEquals(0, + containerScheduler.getNumQueuedGuaranteedContainers()); + Assert.assertEquals(maxOppQueueLength, + containerScheduler.getNumQueuedOpportunisticContainers()); + } + + /** * Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources * requests by each container as such that only one can run in parallel. * Thus, the OPPORTUNISTIC container that started running, will be --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org