hadoop-yarn-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (YARN-4597) Add SCHEDULE to NM container lifecycle
Date Fri, 11 Nov 2016 22:19:59 GMT

    [ https://issues.apache.org/jira/browse/YARN-4597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658334#comment-15658334
] 

ASF GitHub Bot commented on YARN-4597:
--------------------------------------

Github user xslogic commented on a diff in the pull request:

    https://github.com/apache/hadoop/pull/143#discussion_r87668067
  
    --- Diff: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
---
    @@ -0,0 +1,393 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import org.apache.hadoop.service.AbstractService;
    +import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
    +import org.apache.hadoop.yarn.api.records.ContainerId;
    +import org.apache.hadoop.yarn.api.records.ExecutionType;
    +import org.apache.hadoop.yarn.api.records.ResourceUtilization;
    +import org.apache.hadoop.yarn.conf.YarnConfiguration;
    +import org.apache.hadoop.yarn.event.EventHandler;
    +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
    +import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
    +import org.apache.hadoop.yarn.server.nodemanager.Context;
    +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
    +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * The ContainerScheduler manages a collection of runnable containers. It
    + * ensures that a container is launched only if all it launch criteria are
    + * met. It also ensures that OPPORTUNISTIC containers are killed to make
    + * room for GUARANTEED containers.
    + */
    +public class ContainerScheduler extends AbstractService implements
    +    EventHandler<ContainerSchedulerEvent> {
    +
    +  private static final Logger LOG =
    +      LoggerFactory.getLogger(ContainerScheduler.class);
    +
    +  private final Context context;
    +  private final int maxOppQueueLength;
    +
    +  // Queue of Guaranteed Containers waiting for resources to run
    +  private final LinkedHashMap<ContainerId, Container>
    +      queuedGuaranteedContainers = new LinkedHashMap<>();
    +  // Queue of Opportunistic Containers waiting for resources to run
    +  private final LinkedHashMap<ContainerId, Container>
    +      queuedOpportunisticContainers = new LinkedHashMap<>();
    +
    +  // Used to keep track of containers that have been marked to be killed
    +  // to make room for a guaranteed container.
    +  private final Map<ContainerId, Container> oppContainersMarkedForKill =
    +      new HashMap<>();
    +
    +  // Containers launched by the Scheduler will take a while to actually
    +  // move to the RUNNING state, but should still be fair game for killing
    +  // by the scheduler to make room for guaranteed containers.
    +  private final LinkedHashMap<ContainerId, Container> scheduledToRunContainers
=
    +      new LinkedHashMap<>();
    +
    +  private final ContainerQueuingLimit queuingLimit =
    +      ContainerQueuingLimit.newInstance();
    +
    +  private final OpportunisticContainersStatus opportunisticContainersStatus;
    +
    +  // Resource Utilization Manager that decides how utilization of the cluster
    +  // increase / decreases based on container start / finish
    +  private ResourceUtilizationManager utilizationManager;
    +
    +  /**
    +   * Instantiate a Container Scheduler.
    +   * @param context NodeManager Context.
    +   */
    +  public ContainerScheduler(Context context) {
    +    this(context, context.getConf().getInt(
    +        YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
    +        YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH_DEFAULT));
    +  }
    +
    +  @VisibleForTesting
    +  public ContainerScheduler(Context context, int qLength) {
    +    super(ContainerScheduler.class.getName());
    +    this.context = context;
    +    this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
    +    this.utilizationManager = new ResourceUtilizationManager(this);
    +    this.opportunisticContainersStatus =
    +        OpportunisticContainersStatus.newInstance();
    +  }
    +
    +  /**
    +   * Handle ContainerSchedulerEvents.
    +   * @param event ContainerSchedulerEvent.
    +   */
    +  @Override
    +  public void handle(ContainerSchedulerEvent event) {
    +    switch (event.getType()) {
    +    case SCHEDULE_CONTAINER:
    +      scheduleContainer(event.getContainer());
    +      break;
    +    case CONTAINER_COMPLETED:
    +      onContainerCompleted(event.getContainer());
    +      break;
    +    default:
    +      LOG.error("Unknown event arrived at ContainerScheduler: "
    +          + event.toString());
    +    }
    +  }
    +
    +  /**
    +   * Return number of queued containers.
    +   * @return Number of queued containers.
    +   */
    +  public int getNumQueuedContainers() {
    +    return this.queuedGuaranteedContainers.size()
    +        + this.queuedOpportunisticContainers.size();
    +  }
    +
    +  @VisibleForTesting
    +  public int getNumQueuedGuaranteedContainers() {
    +    return this.queuedGuaranteedContainers.size();
    +  }
    +
    +  @VisibleForTesting
    +  public int getNumQueuedOpportunisticContainers() {
    +    return this.queuedOpportunisticContainers.size();
    +  }
    +
    +  public OpportunisticContainersStatus getOpportunisticContainersStatus() {
    +    this.opportunisticContainersStatus.setQueuedOpportContainers(
    +        getNumQueuedOpportunisticContainers());
    +    this.opportunisticContainersStatus.setWaitQueueLength(
    +        getNumQueuedContainers());
    +    return this.opportunisticContainersStatus;
    +  }
    +
    +  private void onContainerCompleted(Container container) {
    +    // decrement only if it was a running container
    +    if (scheduledToRunContainers.containsKey(container.getContainerId())) {
    +      this.utilizationManager.subtractContainerResource(container);
    +      if (container.getContainerTokenIdentifier().getExecutionType() ==
    +          ExecutionType.OPPORTUNISTIC) {
    +        this.opportunisticContainersStatus.setOpportMemoryUsed(
    +            this.opportunisticContainersStatus.getOpportMemoryUsed()
    +                - container.getResource().getMemorySize());
    +        this.opportunisticContainersStatus.setOpportCoresUsed(
    +            this.opportunisticContainersStatus.getOpportCoresUsed()
    +                - container.getResource().getVirtualCores());
    +        this.opportunisticContainersStatus.setRunningOpportContainers(
    +            this.opportunisticContainersStatus.getRunningOpportContainers()
    +                - 1);
    +      }
    +    }
    +    scheduledToRunContainers.remove(container.getContainerId());
    +    oppContainersMarkedForKill.remove(container.getContainerId());
    +    startPendingContainers();
    +  }
    +
    +  private void startPendingContainers() {
    +    // Start pending guaranteed containers, if resources available.
    +    boolean resourcesAvailable =
    +        startContainersFromQueue(queuedGuaranteedContainers.values());
    +    // Start opportunistic containers, if resources available.
    +    if (resourcesAvailable) {
    +      startContainersFromQueue(queuedOpportunisticContainers.values());
    +    }
    +  }
    +
    +  private boolean startContainersFromQueue(
    +      Collection<Container> queuedContainers) {
    +    Iterator<Container> cIter = queuedContainers.iterator();
    +    boolean resourcesAvailable = true;
    +    while (cIter.hasNext() && resourcesAvailable) {
    +      Container container = cIter.next();
    +      if (this.utilizationManager.hasResourcesAvailable(container)) {
    +        startAllocatedContainer(container);
    +        cIter.remove();
    +      } else {
    +        resourcesAvailable = false;
    +      }
    +    }
    +    return resourcesAvailable;
    +  }
    +
    +  @VisibleForTesting
    +  protected void scheduleContainer(Container container) {
    +    if (maxOppQueueLength <= 0) {
    +      startAllocatedContainer(container);
    +      return;
    +    }
    +    if (queuedGuaranteedContainers.isEmpty() &&
    +        queuedOpportunisticContainers.isEmpty() &&
    +        this.utilizationManager.hasResourcesAvailable(container)) {
    +      startAllocatedContainer(container);
    +    } else {
    +      LOG.info("No available resources for container {} to start its execution "
    +          + "immediately.", container.getContainerId());
    +      boolean isQueued = true;
    +      if (container.getContainerTokenIdentifier().getExecutionType() ==
    +          ExecutionType.GUARANTEED) {
    +        queuedGuaranteedContainers.put(container.getContainerId(), container);
    +        // Kill running opportunistic containers to make space for
    +        // guaranteed container.
    +        killOpportunisticContainers(container);
    +      } else {
    +        if (queuedOpportunisticContainers.size() <= maxOppQueueLength) {
    +          LOG.info("Opportunistic container {} will be queued at the NM.",
    +              container.getContainerId());
    +          queuedOpportunisticContainers.put(
    +              container.getContainerId(), container);
    +        } else {
    +          isQueued = false;
    +          LOG.info("Opportunistic container [{}] will not be queued at the NM" +
    +              "since max queue length [{}] has been reached",
    +              container.getContainerId(), maxOppQueueLength);
    +          container.sendKillEvent(
    +              ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
    +              "Opportunistic container queue is full.");
    +        }
    +      }
    +      if (isQueued) {
    +        try {
    +          this.context.getNMStateStore().storeContainerQueued(
    +              container.getContainerId());
    +        } catch (IOException e) {
    +          LOG.warn("Could not store container state into store..", e);
    +        }
    +      }
    +    }
    +  }
    +
    +  private void killOpportunisticContainers(Container container) {
    +    List<Container> extraOpportContainersToKill =
    +        pickOpportunisticContainersToKill(container.getContainerId());
    +    // Kill the opportunistic containers that were chosen.
    +    for (Container contToKill : extraOpportContainersToKill) {
    +      contToKill.sendKillEvent(
    +          ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
    +          "Container Killed to make room for Guaranteed Container.");
    +      oppContainersMarkedForKill.put(contToKill.getContainerId(), contToKill);
    +      LOG.info(
    +          "Opportunistic container {} will be killed in order to start the "
    +              + "execution of guaranteed container {}.",
    +          contToKill.getContainerId(), container.getContainerId());
    +    }
    +  }
    +
    +  private void startAllocatedContainer(Container container) {
    +    LOG.info("Starting container [" + container.getContainerId()+ "]");
    +    scheduledToRunContainers.put(container.getContainerId(), container);
    +    this.utilizationManager.addContainerResources(container);
    +    if (container.getContainerTokenIdentifier().getExecutionType() ==
    +        ExecutionType.OPPORTUNISTIC) {
    +      this.opportunisticContainersStatus.setOpportMemoryUsed(
    +          this.opportunisticContainersStatus.getOpportMemoryUsed()
    +              + container.getResource().getMemorySize());
    +      this.opportunisticContainersStatus.setOpportCoresUsed(
    +          this.opportunisticContainersStatus.getOpportCoresUsed()
    +              + container.getResource().getVirtualCores());
    +      this.opportunisticContainersStatus.setRunningOpportContainers(
    +          this.opportunisticContainersStatus.getRunningOpportContainers()
    +              + 1);
    +    }
    +    container.sendLaunchEvent();
    +  }
    +
    +  private List<Container> pickOpportunisticContainersToKill(
    +      ContainerId containerToStartId) {
    +    // The additional opportunistic containers that need to be killed for the
    +    // given container to start.
    +    List<Container> extraOpportContainersToKill = new ArrayList<>();
    +    // Track resources that need to be freed.
    +    ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(
    +        containerToStartId);
    +
    +    // Go over the running opportunistic containers.
    +    // Use a descending iterator to kill more recently started containers.
    +    Iterator<Container> reverseContainerIterator =
    +        new LinkedList<>(scheduledToRunContainers.values()).descendingIterator();
    +    while(reverseContainerIterator.hasNext() &&
    +        !hasSufficientResources(resourcesToFreeUp)) {
    +      Container runningCont = reverseContainerIterator.next();
    +      if (runningCont.getContainerTokenIdentifier().getExecutionType() ==
    +          ExecutionType.OPPORTUNISTIC) {
    +
    +        if (oppContainersMarkedForKill.containsKey(
    +            runningCont.getContainerId())) {
    +          // These containers have already been marked to be killed.
    +          // So exclude them..
    +          continue;
    +        }
    +        extraOpportContainersToKill.add(runningCont);
    +        ResourceUtilizationManager.decreaseResourceUtilization(
    +            getContainersMonitor(), resourcesToFreeUp,
    +            runningCont.getResource());
    +      }
    +    }
    +    if (!hasSufficientResources(resourcesToFreeUp)) {
    +      LOG.warn("There are no sufficient resources to start guaranteed [{}]" +
    --- End diff --
    
    Actually that message might be a bit misleading. It should be re-worded to 'There are
no sufficient resources at the moment to start guaranteed...'. You can only reach here if
kill signals have been sent to Opp. containers, but the completed container event hasn't been
received by the Scheduler yet. At which point the guaranteed container will start.
    It is never possible to allocate a guaranteed container on node that cannot start it.



> Add SCHEDULE to NM container lifecycle
> --------------------------------------
>
>                 Key: YARN-4597
>                 URL: https://issues.apache.org/jira/browse/YARN-4597
>             Project: Hadoop YARN
>          Issue Type: New Feature
>          Components: nodemanager
>            Reporter: Chris Douglas
>            Assignee: Arun Suresh
>              Labels: oct16-hard
>         Attachments: YARN-4597.001.patch, YARN-4597.002.patch, YARN-4597.003.patch, YARN-4597.004.patch,
YARN-4597.005.patch, YARN-4597.006.patch, YARN-4597.007.patch, YARN-4597.008.patch, YARN-4597.009.patch
>
>
> Currently, the NM immediately launches containers after resource localization. Several
features could be more cleanly implemented if the NM included a separate stage for reserving
resources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: yarn-issues-help@hadoop.apache.org


Mime
View raw message