hadoop-yarn-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (YARN-4597) Add SCHEDULE to NM container lifecycle
Date Fri, 11 Nov 2016 22:01:59 GMT

    [ https://issues.apache.org/jira/browse/YARN-4597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658300#comment-15658300
] 

ASF GitHub Bot commented on YARN-4597:
--------------------------------------

Github user xslogic commented on a diff in the pull request:

    https://github.com/apache/hadoop/pull/143#discussion_r87666090
  
    --- Diff: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
---
    @@ -0,0 +1,393 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import org.apache.hadoop.service.AbstractService;
    +import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
    +import org.apache.hadoop.yarn.api.records.ContainerId;
    +import org.apache.hadoop.yarn.api.records.ExecutionType;
    +import org.apache.hadoop.yarn.api.records.ResourceUtilization;
    +import org.apache.hadoop.yarn.conf.YarnConfiguration;
    +import org.apache.hadoop.yarn.event.EventHandler;
    +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
    +import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
    +import org.apache.hadoop.yarn.server.nodemanager.Context;
    +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
    +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * The ContainerScheduler manages a collection of runnable containers. It
    + * ensures that a container is launched only if all it launch criteria are
    + * met. It also ensures that OPPORTUNISTIC containers are killed to make
    + * room for GUARANTEED containers.
    + */
    +public class ContainerScheduler extends AbstractService implements
    +    EventHandler<ContainerSchedulerEvent> {
    +
    +  private static final Logger LOG =
    +      LoggerFactory.getLogger(ContainerScheduler.class);
    +
    +  private final Context context;
    +  private final int maxOppQueueLength;
    +
    +  // Queue of Guaranteed Containers waiting for resources to run
    +  private final LinkedHashMap<ContainerId, Container>
    +      queuedGuaranteedContainers = new LinkedHashMap<>();
    +  // Queue of Opportunistic Containers waiting for resources to run
    +  private final LinkedHashMap<ContainerId, Container>
    +      queuedOpportunisticContainers = new LinkedHashMap<>();
    +
    +  // Used to keep track of containers that have been marked to be killed
    +  // to make room for a guaranteed container.
    +  private final Map<ContainerId, Container> oppContainersMarkedForKill =
    +      new HashMap<>();
    +
    +  // Containers launched by the Scheduler will take a while to actually
    +  // move to the RUNNING state, but should still be fair game for killing
    +  // by the scheduler to make room for guaranteed containers.
    +  private final LinkedHashMap<ContainerId, Container> scheduledToRunContainers
=
    +      new LinkedHashMap<>();
    +
    +  private final ContainerQueuingLimit queuingLimit =
    +      ContainerQueuingLimit.newInstance();
    +
    +  private final OpportunisticContainersStatus opportunisticContainersStatus;
    +
    +  // Resource Utilization Manager that decides how utilization of the cluster
    +  // increase / decreases based on container start / finish
    +  private ResourceUtilizationManager utilizationManager;
    +
    +  /**
    +   * Instantiate a Container Scheduler.
    +   * @param context NodeManager Context.
    +   */
    +  public ContainerScheduler(Context context) {
    +    this(context, context.getConf().getInt(
    +        YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
    +        YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH_DEFAULT));
    +  }
    +
    +  @VisibleForTesting
    +  public ContainerScheduler(Context context, int qLength) {
    +    super(ContainerScheduler.class.getName());
    +    this.context = context;
    +    this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
    +    this.utilizationManager = new ResourceUtilizationManager(this);
    +    this.opportunisticContainersStatus =
    +        OpportunisticContainersStatus.newInstance();
    +  }
    +
    +  /**
    +   * Handle ContainerSchedulerEvents.
    +   * @param event ContainerSchedulerEvent.
    +   */
    +  @Override
    +  public void handle(ContainerSchedulerEvent event) {
    +    switch (event.getType()) {
    +    case SCHEDULE_CONTAINER:
    +      scheduleContainer(event.getContainer());
    +      break;
    +    case CONTAINER_COMPLETED:
    +      onContainerCompleted(event.getContainer());
    +      break;
    +    default:
    +      LOG.error("Unknown event arrived at ContainerScheduler: "
    +          + event.toString());
    +    }
    +  }
    +
    +  /**
    +   * Return number of queued containers.
    +   * @return Number of queued containers.
    +   */
    +  public int getNumQueuedContainers() {
    +    return this.queuedGuaranteedContainers.size()
    +        + this.queuedOpportunisticContainers.size();
    +  }
    +
    +  @VisibleForTesting
    +  public int getNumQueuedGuaranteedContainers() {
    +    return this.queuedGuaranteedContainers.size();
    +  }
    +
    +  @VisibleForTesting
    +  public int getNumQueuedOpportunisticContainers() {
    +    return this.queuedOpportunisticContainers.size();
    +  }
    +
    +  public OpportunisticContainersStatus getOpportunisticContainersStatus() {
    +    this.opportunisticContainersStatus.setQueuedOpportContainers(
    +        getNumQueuedOpportunisticContainers());
    +    this.opportunisticContainersStatus.setWaitQueueLength(
    +        getNumQueuedContainers());
    +    return this.opportunisticContainersStatus;
    +  }
    +
    +  private void onContainerCompleted(Container container) {
    +    // decrement only if it was a running container
    +    if (scheduledToRunContainers.containsKey(container.getContainerId())) {
    +      this.utilizationManager.subtractContainerResource(container);
    +      if (container.getContainerTokenIdentifier().getExecutionType() ==
    +          ExecutionType.OPPORTUNISTIC) {
    +        this.opportunisticContainersStatus.setOpportMemoryUsed(
    +            this.opportunisticContainersStatus.getOpportMemoryUsed()
    +                - container.getResource().getMemorySize());
    +        this.opportunisticContainersStatus.setOpportCoresUsed(
    +            this.opportunisticContainersStatus.getOpportCoresUsed()
    +                - container.getResource().getVirtualCores());
    +        this.opportunisticContainersStatus.setRunningOpportContainers(
    +            this.opportunisticContainersStatus.getRunningOpportContainers()
    +                - 1);
    +      }
    +    }
    +    scheduledToRunContainers.remove(container.getContainerId());
    +    oppContainersMarkedForKill.remove(container.getContainerId());
    +    startPendingContainers();
    +  }
    +
    +  private void startPendingContainers() {
    +    // Start pending guaranteed containers, if resources available.
    +    boolean resourcesAvailable =
    +        startContainersFromQueue(queuedGuaranteedContainers.values());
    +    // Start opportunistic containers, if resources available.
    +    if (resourcesAvailable) {
    +      startContainersFromQueue(queuedOpportunisticContainers.values());
    +    }
    +  }
    +
    +  private boolean startContainersFromQueue(
    +      Collection<Container> queuedContainers) {
    +    Iterator<Container> cIter = queuedContainers.iterator();
    +    boolean resourcesAvailable = true;
    +    while (cIter.hasNext() && resourcesAvailable) {
    +      Container container = cIter.next();
    +      if (this.utilizationManager.hasResourcesAvailable(container)) {
    +        startAllocatedContainer(container);
    +        cIter.remove();
    +      } else {
    +        resourcesAvailable = false;
    +      }
    +    }
    +    return resourcesAvailable;
    +  }
    +
    +  @VisibleForTesting
    +  protected void scheduleContainer(Container container) {
    +    if (maxOppQueueLength <= 0) {
    +      startAllocatedContainer(container);
    +      return;
    +    }
    +    if (queuedGuaranteedContainers.isEmpty() &&
    +        queuedOpportunisticContainers.isEmpty() &&
    +        this.utilizationManager.hasResourcesAvailable(container)) {
    +      startAllocatedContainer(container);
    +    } else {
    +      LOG.info("No available resources for container {} to start its execution "
    +          + "immediately.", container.getContainerId());
    +      boolean isQueued = true;
    +      if (container.getContainerTokenIdentifier().getExecutionType() ==
    +          ExecutionType.GUARANTEED) {
    +        queuedGuaranteedContainers.put(container.getContainerId(), container);
    +        // Kill running opportunistic containers to make space for
    +        // guaranteed container.
    +        killOpportunisticContainers(container);
    +      } else {
    +        if (queuedOpportunisticContainers.size() <= maxOppQueueLength) {
    +          LOG.info("Opportunistic container {} will be queued at the NM.",
    +              container.getContainerId());
    +          queuedOpportunisticContainers.put(
    +              container.getContainerId(), container);
    +        } else {
    +          isQueued = false;
    +          LOG.info("Opportunistic container [{}] will not be queued at the NM" +
    +              "since max queue length [{}] has been reached",
    +              container.getContainerId(), maxOppQueueLength);
    +          container.sendKillEvent(
    +              ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
    +              "Opportunistic container queue is full.");
    +        }
    +      }
    +      if (isQueued) {
    +        try {
    +          this.context.getNMStateStore().storeContainerQueued(
    +              container.getContainerId());
    +        } catch (IOException e) {
    +          LOG.warn("Could not store container state into store..", e);
    --- End diff --
    
    Will modify the error message...


> Add SCHEDULE to NM container lifecycle
> --------------------------------------
>
>                 Key: YARN-4597
>                 URL: https://issues.apache.org/jira/browse/YARN-4597
>             Project: Hadoop YARN
>          Issue Type: New Feature
>          Components: nodemanager
>            Reporter: Chris Douglas
>            Assignee: Arun Suresh
>              Labels: oct16-hard
>         Attachments: YARN-4597.001.patch, YARN-4597.002.patch, YARN-4597.003.patch, YARN-4597.004.patch,
YARN-4597.005.patch, YARN-4597.006.patch, YARN-4597.007.patch, YARN-4597.008.patch, YARN-4597.009.patch
>
>
> Currently, the NM immediately launches containers after resource localization. Several
features could be more cleanly implemented if the NM included a separate stage for reserving
resources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: yarn-issues-help@hadoop.apache.org


Mime
View raw message