Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3F88F200BBE for ; Fri, 11 Nov 2016 20:40:11 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 3E2C0160B16; Fri, 11 Nov 2016 19:40:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1374C160AF6 for ; Fri, 11 Nov 2016 20:40:09 +0100 (CET) Received: (qmail 78807 invoked by uid 500); 11 Nov 2016 19:40:09 -0000 Mailing-List: contact yarn-issues-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list yarn-issues@hadoop.apache.org Received: (qmail 78712 invoked by uid 99); 11 Nov 2016 19:40:08 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Nov 2016 19:40:08 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id D4A482C4C70 for ; Fri, 11 Nov 2016 19:40:08 +0000 (UTC) Date: Fri, 11 Nov 2016 19:40:08 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: yarn-issues@hadoop.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (YARN-4597) Add SCHEDULE to NM container lifecycle MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 11 Nov 2016 19:40:11 -0000 [ https://issues.apache.org/jira/browse/YARN-4597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657943#comment-15657943 ] ASF GitHub Bot commented on YARN-4597: -------------------------------------- Github user kambatla commented on a diff in the pull request: https://github.com/apache/hadoop/pull/143#discussion_r87641557 --- Diff: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java --- @@ -0,0 +1,393 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; +import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * The ContainerScheduler manages a collection of runnable containers. It + * ensures that a container is launched only if all it launch criteria are + * met. It also ensures that OPPORTUNISTIC containers are killed to make + * room for GUARANTEED containers. + */ +public class ContainerScheduler extends AbstractService implements + EventHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(ContainerScheduler.class); + + private final Context context; + private final int maxOppQueueLength; + + // Queue of Guaranteed Containers waiting for resources to run + private final LinkedHashMap + queuedGuaranteedContainers = new LinkedHashMap<>(); + // Queue of Opportunistic Containers waiting for resources to run + private final LinkedHashMap + queuedOpportunisticContainers = new LinkedHashMap<>(); + + // Used to keep track of containers that have been marked to be killed + // to make room for a guaranteed container. + private final Map oppContainersMarkedForKill = + new HashMap<>(); + + // Containers launched by the Scheduler will take a while to actually + // move to the RUNNING state, but should still be fair game for killing + // by the scheduler to make room for guaranteed containers. + private final LinkedHashMap scheduledToRunContainers = + new LinkedHashMap<>(); + + private final ContainerQueuingLimit queuingLimit = + ContainerQueuingLimit.newInstance(); + + private final OpportunisticContainersStatus opportunisticContainersStatus; + + // Resource Utilization Manager that decides how utilization of the cluster + // increase / decreases based on container start / finish + private ResourceUtilizationManager utilizationManager; + + /** + * Instantiate a Container Scheduler. + * @param context NodeManager Context. + */ + public ContainerScheduler(Context context) { + this(context, context.getConf().getInt( + YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, + YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH_DEFAULT)); + } + + @VisibleForTesting + public ContainerScheduler(Context context, int qLength) { + super(ContainerScheduler.class.getName()); + this.context = context; + this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength; + this.utilizationManager = new ResourceUtilizationManager(this); + this.opportunisticContainersStatus = + OpportunisticContainersStatus.newInstance(); + } + + /** + * Handle ContainerSchedulerEvents. + * @param event ContainerSchedulerEvent. + */ + @Override + public void handle(ContainerSchedulerEvent event) { + switch (event.getType()) { + case SCHEDULE_CONTAINER: + scheduleContainer(event.getContainer()); + break; + case CONTAINER_COMPLETED: + onContainerCompleted(event.getContainer()); + break; + default: + LOG.error("Unknown event arrived at ContainerScheduler: " + + event.toString()); + } + } + + /** + * Return number of queued containers. + * @return Number of queued containers. + */ + public int getNumQueuedContainers() { + return this.queuedGuaranteedContainers.size() + + this.queuedOpportunisticContainers.size(); + } + + @VisibleForTesting + public int getNumQueuedGuaranteedContainers() { + return this.queuedGuaranteedContainers.size(); + } + + @VisibleForTesting + public int getNumQueuedOpportunisticContainers() { + return this.queuedOpportunisticContainers.size(); + } + + public OpportunisticContainersStatus getOpportunisticContainersStatus() { + this.opportunisticContainersStatus.setQueuedOpportContainers( + getNumQueuedOpportunisticContainers()); + this.opportunisticContainersStatus.setWaitQueueLength( + getNumQueuedContainers()); + return this.opportunisticContainersStatus; + } + + private void onContainerCompleted(Container container) { + // decrement only if it was a running container + if (scheduledToRunContainers.containsKey(container.getContainerId())) { + this.utilizationManager.subtractContainerResource(container); + if (container.getContainerTokenIdentifier().getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + this.opportunisticContainersStatus.setOpportMemoryUsed( + this.opportunisticContainersStatus.getOpportMemoryUsed() + - container.getResource().getMemorySize()); + this.opportunisticContainersStatus.setOpportCoresUsed( + this.opportunisticContainersStatus.getOpportCoresUsed() + - container.getResource().getVirtualCores()); + this.opportunisticContainersStatus.setRunningOpportContainers( + this.opportunisticContainersStatus.getRunningOpportContainers() + - 1); + } + } + scheduledToRunContainers.remove(container.getContainerId()); + oppContainersMarkedForKill.remove(container.getContainerId()); + startPendingContainers(); + } + + private void startPendingContainers() { + // Start pending guaranteed containers, if resources available. + boolean resourcesAvailable = + startContainersFromQueue(queuedGuaranteedContainers.values()); + // Start opportunistic containers, if resources available. + if (resourcesAvailable) { + startContainersFromQueue(queuedOpportunisticContainers.values()); + } + } + + private boolean startContainersFromQueue( + Collection queuedContainers) { + Iterator cIter = queuedContainers.iterator(); + boolean resourcesAvailable = true; + while (cIter.hasNext() && resourcesAvailable) { + Container container = cIter.next(); + if (this.utilizationManager.hasResourcesAvailable(container)) { + startAllocatedContainer(container); + cIter.remove(); + } else { + resourcesAvailable = false; + } + } + return resourcesAvailable; + } + + @VisibleForTesting + protected void scheduleContainer(Container container) { + if (maxOppQueueLength <= 0) { + startAllocatedContainer(container); + return; + } + if (queuedGuaranteedContainers.isEmpty() && + queuedOpportunisticContainers.isEmpty() && + this.utilizationManager.hasResourcesAvailable(container)) { + startAllocatedContainer(container); + } else { + LOG.info("No available resources for container {} to start its execution " + + "immediately.", container.getContainerId()); + boolean isQueued = true; + if (container.getContainerTokenIdentifier().getExecutionType() == + ExecutionType.GUARANTEED) { + queuedGuaranteedContainers.put(container.getContainerId(), container); + // Kill running opportunistic containers to make space for + // guaranteed container. + killOpportunisticContainers(container); + } else { + if (queuedOpportunisticContainers.size() <= maxOppQueueLength) { + LOG.info("Opportunistic container {} will be queued at the NM.", + container.getContainerId()); + queuedOpportunisticContainers.put( + container.getContainerId(), container); + } else { + isQueued = false; + LOG.info("Opportunistic container [{}] will not be queued at the NM" + + "since max queue length [{}] has been reached", + container.getContainerId(), maxOppQueueLength); + container.sendKillEvent( + ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER, + "Opportunistic container queue is full."); + } + } + if (isQueued) { + try { + this.context.getNMStateStore().storeContainerQueued( + container.getContainerId()); + } catch (IOException e) { + LOG.warn("Could not store container state into store..", e); + } + } + } + } + + private void killOpportunisticContainers(Container container) { + List extraOpportContainersToKill = + pickOpportunisticContainersToKill(container.getContainerId()); + // Kill the opportunistic containers that were chosen. + for (Container contToKill : extraOpportContainersToKill) { + contToKill.sendKillEvent( + ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER, + "Container Killed to make room for Guaranteed Container."); + oppContainersMarkedForKill.put(contToKill.getContainerId(), contToKill); + LOG.info( + "Opportunistic container {} will be killed in order to start the " + + "execution of guaranteed container {}.", + contToKill.getContainerId(), container.getContainerId()); + } + } + + private void startAllocatedContainer(Container container) { + LOG.info("Starting container [" + container.getContainerId()+ "]"); + scheduledToRunContainers.put(container.getContainerId(), container); + this.utilizationManager.addContainerResources(container); + if (container.getContainerTokenIdentifier().getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + this.opportunisticContainersStatus.setOpportMemoryUsed( + this.opportunisticContainersStatus.getOpportMemoryUsed() + + container.getResource().getMemorySize()); + this.opportunisticContainersStatus.setOpportCoresUsed( + this.opportunisticContainersStatus.getOpportCoresUsed() + + container.getResource().getVirtualCores()); + this.opportunisticContainersStatus.setRunningOpportContainers( + this.opportunisticContainersStatus.getRunningOpportContainers() + + 1); + } + container.sendLaunchEvent(); + } + + private List pickOpportunisticContainersToKill( + ContainerId containerToStartId) { + // The additional opportunistic containers that need to be killed for the + // given container to start. + List extraOpportContainersToKill = new ArrayList<>(); + // Track resources that need to be freed. + ResourceUtilization resourcesToFreeUp = resourcesToFreeUp( + containerToStartId); + + // Go over the running opportunistic containers. + // Use a descending iterator to kill more recently started containers. + Iterator reverseContainerIterator = + new LinkedList<>(scheduledToRunContainers.values()).descendingIterator(); + while(reverseContainerIterator.hasNext() && + !hasSufficientResources(resourcesToFreeUp)) { + Container runningCont = reverseContainerIterator.next(); + if (runningCont.getContainerTokenIdentifier().getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + + if (oppContainersMarkedForKill.containsKey( + runningCont.getContainerId())) { + // These containers have already been marked to be killed. + // So exclude them.. + continue; + } + extraOpportContainersToKill.add(runningCont); + ResourceUtilizationManager.decreaseResourceUtilization( + getContainersMonitor(), resourcesToFreeUp, + runningCont.getResource()); + } + } + if (!hasSufficientResources(resourcesToFreeUp)) { + LOG.warn("There are no sufficient resources to start guaranteed [{}]" + + "even after attempting to kill all running" + + "opportunistic containers.", containerToStartId); + } + return extraOpportContainersToKill; + } + + private boolean hasSufficientResources( + ResourceUtilization resourcesToFreeUp) { + return resourcesToFreeUp.getPhysicalMemory() <= 0 && + resourcesToFreeUp.getVirtualMemory() <= 0 && + resourcesToFreeUp.getCPU() <= 0.0f; + } + + private ResourceUtilization resourcesToFreeUp( + ContainerId containerToStartId) { + // Get allocation of currently allocated containers. + ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization + .newInstance(this.utilizationManager.getCurrentUtilization()); + + // Add to the allocation the allocation of the pending guaranteed + // containers that will start before the current container will be started. + for (Container container : queuedGuaranteedContainers.values()) { + ResourceUtilizationManager.increaseResourceUtilization( + getContainersMonitor(), resourceAllocationToFreeUp, + container.getResource()); + if (container.getContainerId().equals(containerToStartId)) { + break; + } + } + + // These Resources have already been freed, due to demand from an --- End diff -- Rephrase? "These resources are being freed, likely at the behest of another guaranteed container."? > Add SCHEDULE to NM container lifecycle > -------------------------------------- > > Key: YARN-4597 > URL: https://issues.apache.org/jira/browse/YARN-4597 > Project: Hadoop YARN > Issue Type: New Feature > Components: nodemanager > Reporter: Chris Douglas > Assignee: Arun Suresh > Labels: oct16-hard > Attachments: YARN-4597.001.patch, YARN-4597.002.patch, YARN-4597.003.patch, YARN-4597.004.patch, YARN-4597.005.patch, YARN-4597.006.patch, YARN-4597.007.patch, YARN-4597.008.patch, YARN-4597.009.patch > > > Currently, the NM immediately launches containers after resource localization. Several features could be more cleanly implemented if the NM included a separate stage for reserving resources. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: yarn-issues-unsubscribe@hadoop.apache.org For additional commands, e-mail: yarn-issues-help@hadoop.apache.org