hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject [3/4] hadoop git commit: YARN-4597. Introduce ContainerScheduler and a SCHEDULED state to NodeManager container lifecycle. (asuresh)
Date Tue, 15 Nov 2016 15:57:08 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
deleted file mode 100644
index 495f57a..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
+++ /dev/null
@@ -1,686 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceUtilization;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.security.NMTokenIdentifier;
-import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
-import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
-import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
-import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Class extending {@link ContainerManagerImpl} and is used when queuing at the
- * NM is enabled.
- */
-public class QueuingContainerManagerImpl extends ContainerManagerImpl {
-
-  private static final Logger LOG = LoggerFactory
-      .getLogger(QueuingContainerManagerImpl.class);
-
-  private ConcurrentMap<ContainerId, AllocatedContainerInfo>
-        allocatedGuaranteedContainers;
-  private ConcurrentMap<ContainerId, AllocatedContainerInfo>
-        allocatedOpportunisticContainers;
-
-  private long allocatedMemoryOpportunistic;
-  private int allocatedVCoresOpportunistic;
-
-  private Queue<AllocatedContainerInfo> queuedGuaranteedContainers;
-  private Queue<AllocatedContainerInfo> queuedOpportunisticContainers;
-
-  private Set<ContainerId> opportunisticContainersToKill;
-  private final OpportunisticContainersStatus opportunisticContainersStatus;
-  private final ContainerQueuingLimit queuingLimit;
-
-  public QueuingContainerManagerImpl(Context context, ContainerExecutor exec,
-      DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
-      NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
-    super(context, exec, deletionContext, nodeStatusUpdater, metrics,
-        dirsHandler);
-    this.allocatedGuaranteedContainers = new ConcurrentHashMap<>();
-    this.allocatedOpportunisticContainers = new ConcurrentHashMap<>();
-    this.allocatedMemoryOpportunistic = 0;
-    this.allocatedVCoresOpportunistic = 0;
-    this.queuedGuaranteedContainers = new ConcurrentLinkedQueue<>();
-    this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>();
-    this.opportunisticContainersToKill = Collections.synchronizedSet(
-        new HashSet<ContainerId>());
-    this.opportunisticContainersStatus =
-        OpportunisticContainersStatus.newInstance();
-    this.queuingLimit = ContainerQueuingLimit.newInstance();
-  }
-
-  @Override
-  protected EventHandler<ApplicationEvent> createApplicationEventDispatcher() {
-    return new QueuingApplicationEventDispatcher(
-        super.createApplicationEventDispatcher());
-  }
-
-  @Override
-  protected void startContainerInternal(
-      ContainerTokenIdentifier containerTokenIdentifier,
-      StartContainerRequest request) throws YarnException, IOException {
-    this.context.getQueuingContext().getQueuedContainers().put(
-        containerTokenIdentifier.getContainerID(), containerTokenIdentifier);
-
-    AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo(
-        containerTokenIdentifier, request,
-        containerTokenIdentifier.getExecutionType(), containerTokenIdentifier
-            .getResource(), getConfig());
-
-    // If there are already free resources for the container to start, and
-    // there are no queued containers waiting to be executed, start this
-    // container immediately.
-    if (queuedGuaranteedContainers.isEmpty() &&
-        queuedOpportunisticContainers.isEmpty() &&
-        getContainersMonitor().
-            hasResourcesAvailable(allocatedContInfo.getPti())) {
-      startAllocatedContainer(allocatedContInfo);
-    } else {
-      ContainerId cIdToStart = containerTokenIdentifier.getContainerID();
-      this.context.getNMStateStore().storeContainer(cIdToStart,
-          containerTokenIdentifier.getVersion(), request);
-      this.context.getNMStateStore().storeContainerQueued(cIdToStart);
-      LOG.info("No available resources for container {} to start its execution "
-          + "immediately.", cIdToStart);
-      if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) {
-        queuedGuaranteedContainers.add(allocatedContInfo);
-        // Kill running opportunistic containers to make space for
-        // guaranteed container.
-        killOpportunisticContainers(allocatedContInfo);
-      } else {
-        LOG.info("Opportunistic container {} will be queued at the NM.",
-            cIdToStart);
-        queuedOpportunisticContainers.add(allocatedContInfo);
-      }
-    }
-  }
-
-  @Override
-  protected void stopContainerInternal(ContainerId containerID)
-      throws YarnException, IOException {
-    Container container = this.context.getContainers().get(containerID);
-    // If container is null and distributed scheduling is enabled, container
-    // might be queued. Otherwise, container might not be handled by this NM.
-    if (container == null && this.context.getQueuingContext()
-        .getQueuedContainers().containsKey(containerID)) {
-      ContainerTokenIdentifier containerTokenId = this.context
-          .getQueuingContext().getQueuedContainers().remove(containerID);
-
-      boolean foundInQueue = removeQueuedContainer(containerID,
-          containerTokenId.getExecutionType());
-
-      if (foundInQueue) {
-        LOG.info("Removing queued container with ID " + containerID);
-        this.context.getQueuingContext().getKilledQueuedContainers().put(
-            containerTokenId,
-            "Queued container request removed by ApplicationMaster.");
-        this.context.getNMStateStore().storeContainerKilled(containerID);
-      } else {
-        // The container started execution in the meanwhile.
-        try {
-          stopContainerInternalIfRunning(containerID);
-        } catch (YarnException | IOException e) {
-          LOG.error("Container did not get removed successfully.", e);
-        }
-      }
-
-      nodeStatusUpdater.sendOutofBandHeartBeat();
-    } else {
-      super.stopContainerInternal(containerID);
-    }
-  }
-
-  /**
-   * Start the execution of the given container. Also add it to the allocated
-   * containers, and update allocated resource utilization.
-   */
-  private void startAllocatedContainer(
-      AllocatedContainerInfo allocatedContainerInfo) {
-    ProcessTreeInfo pti = allocatedContainerInfo.getPti();
-
-    if (allocatedContainerInfo.getExecutionType() ==
-        ExecutionType.GUARANTEED) {
-      allocatedGuaranteedContainers.put(pti.getContainerId(),
-          allocatedContainerInfo);
-    } else {
-      allocatedOpportunisticContainers.put(pti.getContainerId(),
-          allocatedContainerInfo);
-      allocatedMemoryOpportunistic += pti.getPmemLimit();
-      allocatedVCoresOpportunistic += pti.getCpuVcores();
-    }
-
-    getContainersMonitor().increaseContainersAllocation(pti);
-
-    // Start execution of container.
-    ContainerId containerId = allocatedContainerInfo
-        .getContainerTokenIdentifier().getContainerID();
-    this.context.getQueuingContext().getQueuedContainers().remove(containerId);
-    try {
-      LOG.info("Starting container [" + containerId + "]");
-      super.startContainerInternal(
-          allocatedContainerInfo.getContainerTokenIdentifier(),
-          allocatedContainerInfo.getStartRequest());
-    } catch (YarnException | IOException e) {
-      containerFailedToStart(pti.getContainerId(),
-          allocatedContainerInfo.getContainerTokenIdentifier());
-      LOG.error("Container failed to start.", e);
-    }
-  }
-
-  private void containerFailedToStart(ContainerId containerId,
-      ContainerTokenIdentifier containerTokenId) {
-    this.context.getQueuingContext().getQueuedContainers().remove(containerId);
-
-    removeAllocatedContainer(containerId);
-
-    this.context.getQueuingContext().getKilledQueuedContainers().put(
-        containerTokenId,
-        "Container removed from queue as it failed to start.");
-  }
-
-  /**
-   * Remove the given container from the container queues.
-   *
-   * @return true if the container was found in one of the queues.
-   */
-  private boolean removeQueuedContainer(ContainerId containerId,
-      ExecutionType executionType) {
-    Queue<AllocatedContainerInfo> queue =
-        (executionType == ExecutionType.GUARANTEED) ?
-            queuedGuaranteedContainers : queuedOpportunisticContainers;
-
-    boolean foundInQueue = false;
-    Iterator<AllocatedContainerInfo> iter = queue.iterator();
-    while (iter.hasNext() && !foundInQueue) {
-      if (iter.next().getPti().getContainerId().equals(containerId)) {
-        iter.remove();
-        foundInQueue = true;
-      }
-    }
-
-    return foundInQueue;
-  }
-
-  /**
-   * Remove the given container from the allocated containers, and update
-   * allocated container utilization accordingly.
-   */
-  private void removeAllocatedContainer(ContainerId containerId) {
-    AllocatedContainerInfo contToRemove = null;
-
-    contToRemove = allocatedGuaranteedContainers.remove(containerId);
-
-    if (contToRemove == null) {
-      contToRemove = allocatedOpportunisticContainers.remove(containerId);
-    }
-
-    // If container was indeed running, update allocated resource utilization.
-    if (contToRemove != null) {
-      getContainersMonitor().decreaseContainersAllocation(contToRemove
-          .getPti());
-
-      if (contToRemove.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
-        allocatedMemoryOpportunistic -= contToRemove.getPti().getPmemLimit();
-        allocatedVCoresOpportunistic -= contToRemove.getPti().getCpuVcores();
-      }
-    }
-  }
-
-  /**
-   * Stop a container only if it is currently running. If queued, do not stop
-   * it.
-   */
-  private void stopContainerInternalIfRunning(ContainerId containerID)
-      throws YarnException, IOException {
-    if (this.context.getContainers().containsKey(containerID)) {
-      stopContainerInternal(containerID);
-    }
-  }
-
-  /**
-   * Kill opportunistic containers to free up resources for running the given
-   * container.
-   *
-   * @param allocatedContInfo
-   *          the container whose execution needs to start by freeing up
-   *          resources occupied by opportunistic containers.
-   */
-  private void killOpportunisticContainers(
-      AllocatedContainerInfo allocatedContInfo) {
-    ContainerId containerToStartId = allocatedContInfo.getPti()
-        .getContainerId();
-    List<ContainerId> extraOpportContainersToKill =
-        pickOpportunisticContainersToKill(containerToStartId);
-
-    // Kill the opportunistic containers that were chosen.
-    for (ContainerId contIdToKill : extraOpportContainersToKill) {
-      try {
-        stopContainerInternalIfRunning(contIdToKill);
-      } catch (YarnException | IOException e) {
-        LOG.error("Container did not get removed successfully.", e);
-      }
-      LOG.info(
-          "Opportunistic container {} will be killed in order to start the "
-              + "execution of guaranteed container {}.",
-              contIdToKill, containerToStartId);
-    }
-  }
-
-  /**
-   * Choose the opportunistic containers to kill in order to free up resources
-   * for running the given container.
-   *
-   * @param containerToStartId
-   *          the container whose execution needs to start by freeing up
-   *          resources occupied by opportunistic containers.
-   * @return the additional opportunistic containers that need to be killed.
-   */
-  protected List<ContainerId> pickOpportunisticContainersToKill(
-      ContainerId containerToStartId) {
-    // The additional opportunistic containers that need to be killed for the
-    // given container to start.
-    List<ContainerId> extraOpportContainersToKill = new ArrayList<>();
-    // Track resources that need to be freed.
-    ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(
-        containerToStartId);
-
-    // Go over the running opportunistic containers. Avoid containers that have
-    // already been marked for killing.
-    boolean hasSufficientResources = false;
-    for (Map.Entry<ContainerId, AllocatedContainerInfo> runningOpportCont :
-        allocatedOpportunisticContainers.entrySet()) {
-      ContainerId runningOpportContId = runningOpportCont.getKey();
-
-      // If there are sufficient resources to execute the given container, do
-      // not kill more opportunistic containers.
-      if (resourcesToFreeUp.getPhysicalMemory() <= 0 &&
-          resourcesToFreeUp.getVirtualMemory() <= 0 &&
-          resourcesToFreeUp.getCPU() <= 0.0f) {
-        hasSufficientResources = true;
-        break;
-      }
-
-      if (!opportunisticContainersToKill.contains(runningOpportContId)) {
-        extraOpportContainersToKill.add(runningOpportContId);
-        opportunisticContainersToKill.add(runningOpportContId);
-        getContainersMonitor().decreaseResourceUtilization(resourcesToFreeUp,
-            runningOpportCont.getValue().getPti());
-      }
-    }
-
-    if (!hasSufficientResources) {
-      LOG.info(
-          "There are no sufficient resources to start guaranteed {} even after "
-              + "attempting to kill any running opportunistic containers.",
-          containerToStartId);
-    }
-
-    return extraOpportContainersToKill;
-  }
-
-  /**
-   * Calculates the amount of resources that need to be freed up (by killing
-   * opportunistic containers) in order for the given guaranteed container to
-   * start its execution. Resource allocation to be freed up =
-   * <code>containersAllocation</code> -
-   *   allocation of <code>opportunisticContainersToKill</code> +
-   *   allocation of <code>queuedGuaranteedContainers</code> that will start
-   *     before the given container +
-   *   allocation of given container -
-   *   total resources of node.
-   *
-   * @param containerToStartId
-   *          the ContainerId of the guaranteed container for which we need to
-   *          free resources, so that its execution can start.
-   * @return the resources that need to be freed up for the given guaranteed
-   *         container to start.
-   */
-  private ResourceUtilization resourcesToFreeUp(
-      ContainerId containerToStartId) {
-    // Get allocation of currently allocated containers.
-    ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization
-        .newInstance(getContainersMonitor().getContainersAllocation());
-
-    // Subtract from the allocation the allocation of the opportunistic
-    // containers that are marked for killing.
-    for (ContainerId opportContId : opportunisticContainersToKill) {
-      if (allocatedOpportunisticContainers.containsKey(opportContId)) {
-        getContainersMonitor().decreaseResourceUtilization(
-            resourceAllocationToFreeUp,
-            allocatedOpportunisticContainers.get(opportContId).getPti());
-      }
-    }
-    // Add to the allocation the allocation of the pending guaranteed
-    // containers that will start before the current container will be started.
-    for (AllocatedContainerInfo guarContInfo : queuedGuaranteedContainers) {
-      getContainersMonitor().increaseResourceUtilization(
-          resourceAllocationToFreeUp, guarContInfo.getPti());
-      if (guarContInfo.getPti().getContainerId().equals(containerToStartId)) {
-        break;
-      }
-    }
-    // Subtract the overall node resources.
-    getContainersMonitor().subtractNodeResourcesFromResourceUtilization(
-        resourceAllocationToFreeUp);
-    return resourceAllocationToFreeUp;
-  }
-
-  /**
-   * If there are available resources, try to start as many pending containers
-   * as possible.
-   */
-  private void startPendingContainers() {
-    // Start pending guaranteed containers, if resources available.
-    boolean resourcesAvailable =
-        startContainersFromQueue(queuedGuaranteedContainers);
-
-    // Start opportunistic containers, if resources available.
-    if (resourcesAvailable) {
-      startContainersFromQueue(queuedOpportunisticContainers);
-    }
-  }
-
-  private boolean startContainersFromQueue(
-      Queue<AllocatedContainerInfo> queuedContainers) {
-    Iterator<AllocatedContainerInfo> guarIter = queuedContainers.iterator();
-    boolean resourcesAvailable = true;
-
-    while (guarIter.hasNext() && resourcesAvailable) {
-      AllocatedContainerInfo allocatedContInfo = guarIter.next();
-
-      if (getContainersMonitor().hasResourcesAvailable(
-          allocatedContInfo.getPti())) {
-        startAllocatedContainer(allocatedContInfo);
-        guarIter.remove();
-      } else {
-        resourcesAvailable = false;
-      }
-    }
-    return resourcesAvailable;
-  }
-
-  @Override
-  protected ContainerStatus getContainerStatusInternal(ContainerId containerID,
-      NMTokenIdentifier nmTokenIdentifier) throws YarnException {
-    Container container = this.context.getContainers().get(containerID);
-    if (container == null) {
-      ContainerTokenIdentifier containerTokenId = this.context
-          .getQueuingContext().getQueuedContainers().get(containerID);
-      if (containerTokenId != null) {
-        ExecutionType executionType = this.context.getQueuingContext()
-            .getQueuedContainers().get(containerID).getExecutionType();
-        return BuilderUtils.newContainerStatus(containerID,
-            org.apache.hadoop.yarn.api.records.ContainerState.QUEUED, "",
-            ContainerExitStatus.INVALID, this.context.getQueuingContext()
-                .getQueuedContainers().get(containerID).getResource(),
-            executionType);
-      } else {
-        // Check if part of the stopped/killed queued containers.
-        for (ContainerTokenIdentifier cTokenId : this.context
-            .getQueuingContext().getKilledQueuedContainers().keySet()) {
-          if (cTokenId.getContainerID().equals(containerID)) {
-            return BuilderUtils.newContainerStatus(containerID,
-                org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
-                this.context.getQueuingContext().getKilledQueuedContainers()
-                    .get(cTokenId), ContainerExitStatus.ABORTED, cTokenId
-                        .getResource(), cTokenId.getExecutionType());
-          }
-        }
-      }
-    }
-    return super.getContainerStatusInternal(containerID, nmTokenIdentifier);
-  }
-
-  /**
-   * Recover running or queued container.
-   */
-  @Override
-  protected void recoverActiveContainer(
-      ContainerLaunchContext launchContext, ContainerTokenIdentifier token,
-      RecoveredContainerState rcs) throws IOException {
-    if (rcs.getStatus() ==
-        RecoveredContainerStatus.QUEUED && !rcs.getKilled()) {
-      LOG.info(token.getContainerID()
-          + "will be added to the queued containers.");
-
-      AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo(
-          token, rcs.getStartRequest(), token.getExecutionType(),
-              token.getResource(), getConfig());
-
-      this.context.getQueuingContext().getQueuedContainers().put(
-          token.getContainerID(), token);
-
-      if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) {
-        queuedGuaranteedContainers.add(allocatedContInfo);
-        // Kill running opportunistic containers to make space for
-        // guaranteed container.
-        killOpportunisticContainers(allocatedContInfo);
-      } else {
-        queuedOpportunisticContainers.add(allocatedContInfo);
-      }
-    } else {
-      super.recoverActiveContainer(launchContext, token, rcs);
-    }
-  }
-
-  @VisibleForTesting
-  public int getNumAllocatedGuaranteedContainers() {
-    return allocatedGuaranteedContainers.size();
-  }
-
-  @VisibleForTesting
-  public int getNumAllocatedOpportunisticContainers() {
-    return allocatedOpportunisticContainers.size();
-  }
-
-  @VisibleForTesting
-  public int getNumQueuedGuaranteedContainers() {
-    return queuedGuaranteedContainers.size();
-  }
-
-  @VisibleForTesting
-  public int getNumQueuedOpportunisticContainers() {
-    return queuedOpportunisticContainers.size();
-  }
-
-  class QueuingApplicationEventDispatcher implements
-      EventHandler<ApplicationEvent> {
-    private EventHandler<ApplicationEvent> applicationEventDispatcher;
-
-    public QueuingApplicationEventDispatcher(
-        EventHandler<ApplicationEvent> applicationEventDispatcher) {
-      this.applicationEventDispatcher = applicationEventDispatcher;
-    }
-
-    @Override
-    public void handle(ApplicationEvent event) {
-      if (event.getType() ==
-          ApplicationEventType.APPLICATION_CONTAINER_FINISHED) {
-        if (!(event instanceof ApplicationContainerFinishedEvent)) {
-          throw new RuntimeException("Unexpected event type: " + event);
-        }
-        ApplicationContainerFinishedEvent finishEvent =
-            (ApplicationContainerFinishedEvent) event;
-        // Remove finished container from the allocated containers, and
-        // attempt to start new containers.
-        ContainerId contIdToRemove = finishEvent.getContainerID();
-        removeAllocatedContainer(contIdToRemove);
-        opportunisticContainersToKill.remove(contIdToRemove);
-        startPendingContainers();
-      }
-      this.applicationEventDispatcher.handle(event);
-    }
-  }
-
-  @Override
-  public OpportunisticContainersStatus getOpportunisticContainersStatus() {
-    opportunisticContainersStatus
-        .setRunningOpportContainers(allocatedOpportunisticContainers.size());
-    opportunisticContainersStatus
-        .setOpportMemoryUsed(allocatedMemoryOpportunistic);
-    opportunisticContainersStatus
-        .setOpportCoresUsed(allocatedVCoresOpportunistic);
-    opportunisticContainersStatus
-        .setQueuedOpportContainers(queuedOpportunisticContainers.size());
-    opportunisticContainersStatus.setWaitQueueLength(
-        queuedGuaranteedContainers.size() +
-            queuedOpportunisticContainers.size());
-    return opportunisticContainersStatus;
-  }
-
-  @Override
-  public void updateQueuingLimit(ContainerQueuingLimit limit) {
-    this.queuingLimit.setMaxQueueLength(limit.getMaxQueueLength());
-    // TODO: Include wait time as well once it is implemented
-    if (this.queuingLimit.getMaxQueueLength() > -1) {
-      shedQueuedOpportunisticContainers();
-    }
-  }
-
-  private void shedQueuedOpportunisticContainers() {
-    int numAllowed = this.queuingLimit.getMaxQueueLength();
-    Iterator<AllocatedContainerInfo> containerIter =
-        queuedOpportunisticContainers.iterator();
-    while (containerIter.hasNext()) {
-      AllocatedContainerInfo cInfo = containerIter.next();
-      if (numAllowed <= 0) {
-        containerIter.remove();
-        ContainerTokenIdentifier containerTokenIdentifier = this.context
-            .getQueuingContext().getQueuedContainers().remove(
-                cInfo.getContainerTokenIdentifier().getContainerID());
-        // The Container might have already started while we were
-        // iterating..
-        if (containerTokenIdentifier != null) {
-          this.context.getQueuingContext().getKilledQueuedContainers()
-              .putIfAbsent(cInfo.getContainerTokenIdentifier(),
-                  "Container de-queued to meet NM queuing limits. "
-                      + "Max Queue length["
-                      + this.queuingLimit.getMaxQueueLength() + "]");
-        }
-      }
-      numAllowed--;
-    }
-  }
-
-
-  static class AllocatedContainerInfo {
-    private final ContainerTokenIdentifier containerTokenIdentifier;
-    private final StartContainerRequest startRequest;
-    private final ExecutionType executionType;
-    private final ProcessTreeInfo pti;
-
-    AllocatedContainerInfo(ContainerTokenIdentifier containerTokenIdentifier,
-        StartContainerRequest startRequest, ExecutionType executionType,
-        Resource resource, Configuration conf) {
-      this.containerTokenIdentifier = containerTokenIdentifier;
-      this.startRequest = startRequest;
-      this.executionType = executionType;
-      this.pti = createProcessTreeInfo(containerTokenIdentifier
-          .getContainerID(), resource, conf);
-    }
-
-    private ContainerTokenIdentifier getContainerTokenIdentifier() {
-      return this.containerTokenIdentifier;
-    }
-
-    private StartContainerRequest getStartRequest() {
-      return this.startRequest;
-    }
-
-    private ExecutionType getExecutionType() {
-      return this.executionType;
-    }
-
-    protected ProcessTreeInfo getPti() {
-      return this.pti;
-    }
-
-    private ProcessTreeInfo createProcessTreeInfo(ContainerId containerId,
-        Resource resource, Configuration conf) {
-      long pmemBytes = resource.getMemorySize() * 1024 * 1024L;
-      float pmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
-          YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
-      long vmemBytes = (long) (pmemRatio * pmemBytes);
-      int cpuVcores = resource.getVirtualCores();
-
-      return new ProcessTreeInfo(containerId, null, null, vmemBytes, pmemBytes,
-          cpuVcores);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      boolean equal = false;
-      if (obj instanceof AllocatedContainerInfo) {
-        AllocatedContainerInfo otherContInfo = (AllocatedContainerInfo) obj;
-        equal = this.getPti().getContainerId()
-            .equals(otherContInfo.getPti().getContainerId());
-      }
-      return equal;
-    }
-
-    @Override
-    public int hashCode() {
-      return this.getPti().getContainerId().hashCode();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java
deleted file mode 100644
index 0250807..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * This package contains classes related to the queuing of containers at
- * the NM.
- *
- */
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
new file mode 100644
index 0000000..9839aeb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the {@link ResourceUtilizationTracker} that equates
+ * resource utilization with the total resource allocated to the container.
+ */
+public class AllocationBasedResourceUtilizationTracker implements
+    ResourceUtilizationTracker {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AllocationBasedResourceUtilizationTracker.class);
+
+  private ResourceUtilization containersAllocation;
+  private ContainerScheduler scheduler;
+
+  AllocationBasedResourceUtilizationTracker(ContainerScheduler scheduler) {
+    this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f);
+    this.scheduler = scheduler;
+  }
+
+  /**
+   * Get the accumulation of totally allocated resources to a container.
+   * @return ResourceUtilization Resource Utilization.
+   */
+  @Override
+  public ResourceUtilization getCurrentUtilization() {
+    return this.containersAllocation;
+  }
+
+  /**
+   * Add Container's resources to the accumulated Utilization.
+   * @param container Container.
+   */
+  @Override
+  public void addContainerResources(Container container) {
+    ContainersMonitor.increaseResourceUtilization(
+        getContainersMonitor(), this.containersAllocation,
+        container.getResource());
+  }
+
+  /**
+   * Subtract Container's resources to the accumulated Utilization.
+   * @param container Container.
+   */
+  @Override
+  public void subtractContainerResource(Container container) {
+    ContainersMonitor.decreaseResourceUtilization(
+        getContainersMonitor(), this.containersAllocation,
+        container.getResource());
+  }
+
+  /**
+   * Check if NM has resources available currently to run the container.
+   * @param container Container.
+   * @return True, if NM has resources available currently to run the container.
+   */
+  @Override
+  public boolean hasResourcesAvailable(Container container) {
+    long pMemBytes = container.getResource().getMemorySize() * 1024 * 1024L;
+    return hasResourcesAvailable(pMemBytes,
+        (long) (getContainersMonitor().getVmemRatio()* pMemBytes),
+        container.getResource().getVirtualCores());
+  }
+
+  private boolean hasResourcesAvailable(long pMemBytes, long vMemBytes,
+      int cpuVcores) {
+    // Check physical memory.
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("pMemCheck [current={} + asked={} > allowed={}]",
+          this.containersAllocation.getPhysicalMemory(),
+          (pMemBytes >> 20),
+          (getContainersMonitor().getPmemAllocatedForContainers() >> 20));
+    }
+    if (this.containersAllocation.getPhysicalMemory() +
+        (int) (pMemBytes >> 20) >
+        (int) (getContainersMonitor()
+            .getPmemAllocatedForContainers() >> 20)) {
+      return false;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("before vMemCheck" +
+              "[isEnabled={}, current={} + asked={} > allowed={}]",
+          getContainersMonitor().isVmemCheckEnabled(),
+          this.containersAllocation.getVirtualMemory(), (vMemBytes >> 20),
+          (getContainersMonitor().getVmemAllocatedForContainers() >> 20));
+    }
+    // Check virtual memory.
+    if (getContainersMonitor().isVmemCheckEnabled() &&
+        this.containersAllocation.getVirtualMemory() +
+            (int) (vMemBytes >> 20) >
+            (int) (getContainersMonitor()
+                .getVmemAllocatedForContainers() >> 20)) {
+      return false;
+    }
+
+    float vCores = (float) cpuVcores /
+        getContainersMonitor().getVCoresAllocatedForContainers();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("before cpuCheck [asked={} > allowed={}]",
+          this.containersAllocation.getCPU(), vCores);
+    }
+    // Check CPU.
+    if (this.containersAllocation.getCPU() + vCores > 1.0f) {
+      return false;
+    }
+    return true;
+  }
+
+  public ContainersMonitor getContainersMonitor() {
+    return this.scheduler.getContainersMonitor();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
new file mode 100644
index 0000000..0c2b1ac
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
+import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+
+
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The ContainerScheduler manages a collection of runnable containers. It
+ * ensures that a container is launched only if all its launch criteria are
+ * met. It also ensures that OPPORTUNISTIC containers are killed to make
+ * room for GUARANTEED containers.
+ */
+public class ContainerScheduler extends AbstractService implements
+    EventHandler<ContainerSchedulerEvent> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerScheduler.class);
+
+  private final Context context;
+  private final int maxOppQueueLength;
+
+  // Queue of Guaranteed Containers waiting for resources to run
+  private final LinkedHashMap<ContainerId, Container>
+      queuedGuaranteedContainers = new LinkedHashMap<>();
+  // Queue of Opportunistic Containers waiting for resources to run
+  private final LinkedHashMap<ContainerId, Container>
+      queuedOpportunisticContainers = new LinkedHashMap<>();
+
+  // Used to keep track of containers that have been marked to be killed
+  // to make room for a guaranteed container.
+  private final Map<ContainerId, Container> oppContainersToKill =
+      new HashMap<>();
+
+  // Containers launched by the Scheduler will take a while to actually
+  // move to the RUNNING state, but should still be fair game for killing
+  // by the scheduler to make room for guaranteed containers. This holds
+  // containers that are in RUNNING as well as those in SCHEDULED state that
+  // have been marked to run, but not yet RUNNING.
+  private final LinkedHashMap<ContainerId, Container> runningContainers =
+      new LinkedHashMap<>();
+
+  private final ContainerQueuingLimit queuingLimit =
+      ContainerQueuingLimit.newInstance();
+
+  private final OpportunisticContainersStatus opportunisticContainersStatus;
+
+  // Resource Utilization Tracker that decides how utilization of the cluster
+  // increases / decreases based on container start / finish
+  private ResourceUtilizationTracker utilizationTracker;
+
+  private final AsyncDispatcher dispatcher;
+  private final NodeManagerMetrics metrics;
+
+  /**
+   * Instantiate a Container Scheduler.
+   * @param context NodeManager Context.
+   * @param dispatcher AsyncDispatcher.
+   * @param metrics NodeManagerMetrics.
+   */
+  public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
+      NodeManagerMetrics metrics) {
+    this(context, dispatcher, metrics, context.getConf().getInt(
+        YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
+        YarnConfiguration.
+            NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH_DEFAULT));
+  }
+
+  @VisibleForTesting
+  public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
+      NodeManagerMetrics metrics, int qLength) {
+    super(ContainerScheduler.class.getName());
+    this.context = context;
+    this.dispatcher = dispatcher;
+    this.metrics = metrics;
+    this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
+    this.utilizationTracker =
+        new AllocationBasedResourceUtilizationTracker(this);
+    this.opportunisticContainersStatus =
+        OpportunisticContainersStatus.newInstance();
+  }
+
+  /**
+   * Handle ContainerSchedulerEvents.
+   * @param event ContainerSchedulerEvent.
+   */
+  @Override
+  public void handle(ContainerSchedulerEvent event) {
+    switch (event.getType()) {
+    case SCHEDULE_CONTAINER:
+      scheduleContainer(event.getContainer());
+      break;
+    case CONTAINER_COMPLETED:
+      onContainerCompleted(event.getContainer());
+      break;
+    case SHED_QUEUED_CONTAINERS:
+      shedQueuedOpportunisticContainers();
+      break;
+    default:
+      LOG.error("Unknown event arrived at ContainerScheduler: "
+          + event.toString());
+    }
+  }
+
+  /**
+   * Return number of queued containers.
+   * @return Number of queued containers.
+   */
+  public int getNumQueuedContainers() {
+    return this.queuedGuaranteedContainers.size()
+        + this.queuedOpportunisticContainers.size();
+  }
+
+  @VisibleForTesting
+  public int getNumQueuedGuaranteedContainers() {
+    return this.queuedGuaranteedContainers.size();
+  }
+
+  @VisibleForTesting
+  public int getNumQueuedOpportunisticContainers() {
+    return this.queuedOpportunisticContainers.size();
+  }
+
+  public OpportunisticContainersStatus getOpportunisticContainersStatus() {
+    this.opportunisticContainersStatus.setQueuedOpportContainers(
+        getNumQueuedOpportunisticContainers());
+    this.opportunisticContainersStatus.setWaitQueueLength(
+        getNumQueuedContainers());
+    this.opportunisticContainersStatus.setOpportMemoryUsed(
+        metrics.getOpportMemoryUsed());
+    this.opportunisticContainersStatus.setOpportCoresUsed(
+        metrics.getOpportCoresUsed());
+    this.opportunisticContainersStatus.setRunningOpportContainers(
+        metrics.getRunningOpportContainers());
+    return this.opportunisticContainersStatus;
+  }
+
+  private void onContainerCompleted(Container container) {
+    oppContainersToKill.remove(container.getContainerId());
+
+    // This could be killed externally for eg. by the ContainerManager,
+    // in which case, the container might still be queued.
+    Container queued =
+        queuedOpportunisticContainers.remove(container.getContainerId());
+    if (queued == null) {
+      queuedGuaranteedContainers.remove(container.getContainerId());
+    }
+
+    // decrement only if it was a running container
+    Container completedContainer = runningContainers.remove(container
+        .getContainerId());
+    if (completedContainer != null) {
+      this.utilizationTracker.subtractContainerResource(container);
+      if (container.getContainerTokenIdentifier().getExecutionType() ==
+          ExecutionType.OPPORTUNISTIC) {
+        this.metrics.opportunisticContainerCompleted(container);
+      }
+      startPendingContainers();
+    }
+  }
+
+  private void startPendingContainers() {
+    // Start pending guaranteed containers, if resources available.
+    boolean resourcesAvailable =
+        startContainersFromQueue(queuedGuaranteedContainers.values());
+    // Start opportunistic containers, if resources available.
+    if (resourcesAvailable) {
+      startContainersFromQueue(queuedOpportunisticContainers.values());
+    }
+  }
+
+  private boolean startContainersFromQueue(
+      Collection<Container> queuedContainers) {
+    Iterator<Container> cIter = queuedContainers.iterator();
+    boolean resourcesAvailable = true;
+    while (cIter.hasNext() && resourcesAvailable) {
+      Container container = cIter.next();
+      if (this.utilizationTracker.hasResourcesAvailable(container)) {
+        startAllocatedContainer(container);
+        cIter.remove();
+      } else {
+        resourcesAvailable = false;
+      }
+    }
+    return resourcesAvailable;
+  }
+
+  @VisibleForTesting
+  protected void scheduleContainer(Container container) {
+    if (maxOppQueueLength <= 0) {
+      startAllocatedContainer(container);
+      return;
+    }
+    if (queuedGuaranteedContainers.isEmpty() &&
+        queuedOpportunisticContainers.isEmpty() &&
+        this.utilizationTracker.hasResourcesAvailable(container)) {
+      startAllocatedContainer(container);
+    } else {
+      LOG.info("No available resources for container {} to start its execution "
+          + "immediately.", container.getContainerId());
+      boolean isQueued = true;
+      if (container.getContainerTokenIdentifier().getExecutionType() ==
+          ExecutionType.GUARANTEED) {
+        queuedGuaranteedContainers.put(container.getContainerId(), container);
+        // Kill running opportunistic containers to make space for
+        // guaranteed container.
+        killOpportunisticContainers(container);
+      } else {
+        if (queuedOpportunisticContainers.size() <= maxOppQueueLength) {
+          LOG.info("Opportunistic container {} will be queued at the NM.",
+              container.getContainerId());
+          queuedOpportunisticContainers.put(
+              container.getContainerId(), container);
+        } else {
+          isQueued = false;
+          LOG.info("Opportunistic container [{}] will not be queued at the NM" +
+              "since max queue length [{}] has been reached",
+              container.getContainerId(), maxOppQueueLength);
+          container.sendKillEvent(
+              ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
+              "Opportunistic container queue is full.");
+        }
+      }
+      if (isQueued) {
+        try {
+          this.context.getNMStateStore().storeContainerQueued(
+              container.getContainerId());
+        } catch (IOException e) {
+          LOG.warn("Could not store container [" + container.getContainerId()
+              + "] state. The Container has been queued.", e);
+        }
+      }
+    }
+  }
+
+  private void killOpportunisticContainers(Container container) {
+    List<Container> extraOpportContainersToKill =
+        pickOpportunisticContainersToKill(container.getContainerId());
+    // Kill the opportunistic containers that were chosen.
+    for (Container contToKill : extraOpportContainersToKill) {
+      contToKill.sendKillEvent(
+          ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
+          "Container Killed to make room for Guaranteed Container.");
+      oppContainersToKill.put(contToKill.getContainerId(), contToKill);
+      LOG.info(
+          "Opportunistic container {} will be killed in order to start the "
+              + "execution of guaranteed container {}.",
+          contToKill.getContainerId(), container.getContainerId());
+    }
+  }
+
+  private void startAllocatedContainer(Container container) {
+    LOG.info("Starting container [" + container.getContainerId()+ "]");
+    runningContainers.put(container.getContainerId(), container);
+    this.utilizationTracker.addContainerResources(container);
+    if (container.getContainerTokenIdentifier().getExecutionType() ==
+        ExecutionType.OPPORTUNISTIC) {
+      this.metrics.opportunisticContainerStarted(container);
+    }
+    container.sendLaunchEvent();
+  }
+
+  private List<Container> pickOpportunisticContainersToKill(
+      ContainerId containerToStartId) {
+    // The opportunistic containers that need to be killed for the
+    // given container to start.
+    List<Container> extraOpportContainersToKill = new ArrayList<>();
+    // Track resources that need to be freed.
+    ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(
+        containerToStartId);
+
+    // Go over the running opportunistic containers.
+    // Use a descending iterator to kill more recently started containers.
+    Iterator<Container> lifoIterator = new LinkedList<>(
+        runningContainers.values()).descendingIterator();
+    while(lifoIterator.hasNext() &&
+        !hasSufficientResources(resourcesToFreeUp)) {
+      Container runningCont = lifoIterator.next();
+      if (runningCont.getContainerTokenIdentifier().getExecutionType() ==
+          ExecutionType.OPPORTUNISTIC) {
+
+        if (oppContainersToKill.containsKey(
+            runningCont.getContainerId())) {
+          // These containers have already been marked to be killed.
+          // So exclude them..
+          continue;
+        }
+        extraOpportContainersToKill.add(runningCont);
+        ContainersMonitor.decreaseResourceUtilization(
+            getContainersMonitor(), resourcesToFreeUp,
+            runningCont.getResource());
+      }
+    }
+    if (!hasSufficientResources(resourcesToFreeUp)) {
+      LOG.warn("There are no sufficient resources to start guaranteed [{}]" +
+          "at the moment. Opportunistic containers are in the process of" +
+          "being killed to make room.", containerToStartId);
+    }
+    return extraOpportContainersToKill;
+  }
+
+  private boolean hasSufficientResources(
+      ResourceUtilization resourcesToFreeUp) {
+    return resourcesToFreeUp.getPhysicalMemory() <= 0 &&
+        resourcesToFreeUp.getVirtualMemory() <= 0 &&
+        resourcesToFreeUp.getCPU() <= 0.0f;
+  }
+
+  private ResourceUtilization resourcesToFreeUp(
+      ContainerId containerToStartId) {
+    // Get allocation of currently allocated containers.
+    ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization
+        .newInstance(this.utilizationTracker.getCurrentUtilization());
+
+    // Add to the allocation the allocation of the pending guaranteed
+    // containers that will start before the current container will be started.
+    for (Container container : queuedGuaranteedContainers.values()) {
+      ContainersMonitor.increaseResourceUtilization(
+          getContainersMonitor(), resourceAllocationToFreeUp,
+          container.getResource());
+      if (container.getContainerId().equals(containerToStartId)) {
+        break;
+      }
+    }
+
+    // These resources are being freed, likely at the behest of another
+    // guaranteed container..
+    for (Container container : oppContainersToKill.values()) {
+      ContainersMonitor.decreaseResourceUtilization(
+          getContainersMonitor(), resourceAllocationToFreeUp,
+          container.getResource());
+    }
+
+    // Subtract the overall node resources.
+    getContainersMonitor().subtractNodeResourcesFromResourceUtilization(
+        resourceAllocationToFreeUp);
+    return resourceAllocationToFreeUp;
+  }
+
+  @SuppressWarnings("unchecked")
+  public void updateQueuingLimit(ContainerQueuingLimit limit) {
+    this.queuingLimit.setMaxQueueLength(limit.getMaxQueueLength());
+    // YARN-2886 should add support for wait-times. Include wait time as
+    // well once it is implemented
+    if ((queuingLimit.getMaxQueueLength() > -1) &&
+        (queuingLimit.getMaxQueueLength() <
+            queuedOpportunisticContainers.size())) {
+      dispatcher.getEventHandler().handle(
+          new ContainerSchedulerEvent(null,
+              ContainerSchedulerEventType.SHED_QUEUED_CONTAINERS));
+    }
+  }
+
+  private void shedQueuedOpportunisticContainers() {
+    int numAllowed = this.queuingLimit.getMaxQueueLength();
+    Iterator<Container> containerIter =
+        queuedOpportunisticContainers.values().iterator();
+    while (containerIter.hasNext()) {
+      Container container = containerIter.next();
+      if (numAllowed <= 0) {
+        container.sendKillEvent(
+            ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
+            "Container De-queued to meet NM queuing limits.");
+        containerIter.remove();
+        LOG.info(
+            "Opportunistic container {} will be killed to meet NM queuing" +
+                " limits.", container.getContainerId());
+      }
+      numAllowed--;
+    }
+  }
+
+  public ContainersMonitor getContainersMonitor() {
+    return this.context.getContainerManager().getContainersMonitor();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEvent.java
new file mode 100644
index 0000000..460aaeb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEvent.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+    .Container;
+
+/**
+ * Events consumed by the {@link ContainerScheduler}.
+ */
+public class ContainerSchedulerEvent extends
+    AbstractEvent<ContainerSchedulerEventType> {
+
+  private final Container container;
+
+  /**
+   * Create instance of Event.
+   * @param container Container.
+   * @param eventType EventType.
+   */
+  public ContainerSchedulerEvent(Container container,
+      ContainerSchedulerEventType eventType) {
+    super(eventType);
+    this.container = container;
+  }
+
+  /**
+   * Get the container associated with the event.
+   * @return Container.
+   */
+  public Container getContainer() {
+    return container;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
new file mode 100644
index 0000000..086cb9b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+/**
+ * Event types associated with {@link ContainerSchedulerEvent}.
+ */
+public enum ContainerSchedulerEventType {
+  SCHEDULE_CONTAINER,
+  CONTAINER_COMPLETED,
+  // Producer: Node HB response - RM has asked to shed the queue
+  SHED_QUEUED_CONTAINERS,
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
new file mode 100644
index 0000000..3c17eca
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+
+/**
+ * This interface abstracts out how a container contributes to
+ * Resource Utilization of the node.
+ * It is used by the {@link ContainerScheduler} to determine which
+ * OPPORTUNISTIC containers to be killed to make room for a GUARANTEED
+ * container.
+ */
+public interface ResourceUtilizationTracker {
+
+  /**
+   * Get the current total utilization of all the Containers running on
+   * the node.
+   * @return ResourceUtilization Resource Utilization.
+   */
+  ResourceUtilization getCurrentUtilization();
+
+  /**
+   * Add Container's resources to Node Utilization.
+   * @param container Container.
+   */
+  void addContainerResources(Container container);
+
+  /**
+   * Subtract Container's resources to Node Utilization.
+   * @param container Container.
+   */
+  void subtractContainerResource(Container container);
+
+  /**
+   * Check if NM has resources available currently to run the container.
+   * @param container Container.
+   * @return True, if NM has resources available currently to run the container.
+   */
+  boolean hasResourcesAvailable(Container container);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/package-info.java
new file mode 100644
index 0000000..4641ac0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Container Scheduler
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java
index 6105eff..b001b63 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java
@@ -23,11 +23,14 @@ import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MutableCounterInt;
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 import org.apache.hadoop.metrics2.lib.MutableRate;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.yarn.api.records.Resource;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+    .Container;
 
 @Metrics(about="Metrics for node manager", context="yarn")
 public class NodeManagerMetrics {
@@ -60,6 +63,14 @@ public class NodeManagerMetrics {
       MutableGaugeInt goodLocalDirsDiskUtilizationPerc;
   @Metric("Disk utilization % on good log dirs")
       MutableGaugeInt goodLogDirsDiskUtilizationPerc;
+
+  @Metric("Memory used by Opportunistic Containers in MB")
+      MutableGaugeLong opportMemoryUsed;
+  @Metric("# of Virtual Cores used by opportunistic containers")
+      MutableGaugeInt opportCoresUsed;
+  @Metric("# of running opportunistic containers")
+      MutableGaugeInt runningOpportContainers;
+
   // CHECKSTYLE:ON:VisibilityModifier
 
   private JvmMetrics jvmMetrics = null;
@@ -130,6 +141,30 @@ public class NodeManagerMetrics {
     containersReIniting.decr();
   }
 
+  public long getOpportMemoryUsed() {
+    return opportMemoryUsed.value();
+  }
+
+  public int getOpportCoresUsed() {
+    return opportCoresUsed.value();
+  }
+
+  public int getRunningOpportContainers() {
+    return runningOpportContainers.value();
+  }
+
+  public void opportunisticContainerCompleted(Container container) {
+    opportMemoryUsed.decr(container.getResource().getMemorySize());
+    opportCoresUsed.decr(container.getResource().getVirtualCores());
+    runningOpportContainers.decr();
+  }
+
+  public void opportunisticContainerStarted(Container container) {
+    opportMemoryUsed.incr(container.getResource().getMemorySize());
+    opportCoresUsed.incr(container.getResource().getVirtualCores());
+    runningOpportContainers.incr();
+  }
+
   public void allocateContainer(Resource res) {
     allocatedContainers.incr();
     allocatedMB = allocatedMB + res.getMemorySize();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java
index 35e7593..e1a9995 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -38,7 +37,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
-import org.apache.hadoop.yarn.util.ConverterUtils;
+
 import org.apache.hadoop.yarn.webapp.NotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -149,7 +148,7 @@ public class ContainerLogsUtils {
   
   private static void checkState(ContainerState state) {
     if (state == ContainerState.NEW || state == ContainerState.LOCALIZING ||
-        state == ContainerState.LOCALIZED) {
+        state == ContainerState.SCHEDULED) {
       throw new NotFoundException("Container is not yet running. Current state is "
           + state);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
index 3b84a78..8e4522b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.fs.FileContext;
@@ -158,7 +159,7 @@ public class TestEventFlow {
     containerManager.startContainers(allRequests);
 
     BaseContainerManagerTest.waitForContainerState(containerManager, cID,
-        ContainerState.RUNNING);
+        Arrays.asList(ContainerState.RUNNING, ContainerState.SCHEDULED), 20);
 
     List<ContainerId> containerIds = new ArrayList<ContainerId>();
     containerIds.add(cID);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
index f6593f9..04cfae9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
@@ -454,6 +454,14 @@ public class TestNodeManagerResync {
             if (containersShouldBePreserved) {
               Assert.assertFalse(containers.isEmpty());
               Assert.assertTrue(containers.containsKey(existingCid));
+              ContainerState state = containers.get(existingCid)
+                  .cloneAndGetContainerStatus().getState();
+              // Wait till RUNNING state...
+              int counter = 50;
+              while (state != ContainerState.RUNNING && counter > 0) {
+                Thread.sleep(100);
+                counter--;
+              }
               Assert.assertEquals(ContainerState.RUNNING,
                   containers.get(existingCid)
                   .cloneAndGetContainerStatus().getState());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
index b3ad318..03e06d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
@@ -28,6 +28,7 @@ import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -255,7 +256,9 @@ public class TestNodeManagerShutdown {
         GetContainerStatusesRequest.newInstance(containerIds);
     ContainerStatus containerStatus =
         containerManager.getContainerStatuses(request).getContainerStatuses().get(0);
-    Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
+    Assert.assertTrue(
+        EnumSet.of(ContainerState.RUNNING, ContainerState.SCHEDULED)
+            .contains(containerStatus.getState()));
   }
   
   public static ContainerId createContainerId() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 59a4563..24bd02c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -65,7 +65,6 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -1080,128 +1079,6 @@ public class TestNodeStatusUpdater {
     Assert.assertTrue(containerIdSet.contains(runningContainerId));
   }
 
-  @Test(timeout = 90000)
-  public void testKilledQueuedContainers() throws Exception {
-    NodeManager nm = new NodeManager();
-    YarnConfiguration conf = new YarnConfiguration();
-    conf.set(
-        NodeStatusUpdaterImpl
-            .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
-        "10000");
-    nm.init(conf);
-    NodeStatusUpdaterImpl nodeStatusUpdater =
-        (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
-    ApplicationId appId = ApplicationId.newInstance(0, 0);
-    ApplicationAttemptId appAttemptId =
-        ApplicationAttemptId.newInstance(appId, 0);
-
-    // Add application to context.
-    nm.getNMContext().getApplications().putIfAbsent(appId,
-        mock(Application.class));
-
-    // Create a running container and add it to the context.
-    ContainerId runningContainerId =
-        ContainerId.newContainerId(appAttemptId, 1);
-    Token runningContainerToken =
-        BuilderUtils.newContainerToken(runningContainerId, 0, "anyHost",
-          1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123,
-          "password".getBytes(), 0);
-    Container runningContainer =
-        new ContainerImpl(conf, null, null, null, null,
-          BuilderUtils.newContainerTokenIdentifier(runningContainerToken),
-          nm.getNMContext()) {
-          @Override
-          public ContainerState getCurrentState() {
-            return ContainerState.RUNNING;
-          }
-
-          @Override
-          public org.apache.hadoop.yarn.server.nodemanager.containermanager.
-              container.ContainerState getContainerState() {
-            return org.apache.hadoop.yarn.server.nodemanager.containermanager.
-                container.ContainerState.RUNNING;
-          }
-        };
-
-    nm.getNMContext().getContainers()
-      .put(runningContainerId, runningContainer);
-
-    // Create two killed queued containers and add them to the queuing context.
-    ContainerId killedQueuedContainerId1 = ContainerId.newContainerId(
-        appAttemptId, 2);
-    ContainerTokenIdentifier killedQueuedContainerTokenId1 = BuilderUtils
-        .newContainerTokenIdentifier(BuilderUtils.newContainerToken(
-            killedQueuedContainerId1, 0, "anyHost", 1234, "anyUser",
-            BuilderUtils.newResource(1024, 1), 0, 123,
-            "password".getBytes(), 0));
-    ContainerId killedQueuedContainerId2 = ContainerId.newContainerId(
-        appAttemptId, 3);
-    ContainerTokenIdentifier killedQueuedContainerTokenId2 = BuilderUtils
-        .newContainerTokenIdentifier(BuilderUtils.newContainerToken(
-            killedQueuedContainerId2, 0, "anyHost", 1234, "anyUser",
-            BuilderUtils.newResource(1024, 1), 0, 123,
-            "password".getBytes(), 0));
-
-    nm.getNMContext().getQueuingContext().getKilledQueuedContainers().put(
-        killedQueuedContainerTokenId1, "Queued container killed.");
-    nm.getNMContext().getQueuingContext().getKilledQueuedContainers().put(
-        killedQueuedContainerTokenId2, "Queued container killed.");
-
-    List<ContainerStatus> containerStatuses = nodeStatusUpdater
-        .getContainerStatuses();
-
-    Assert.assertEquals(3, containerStatuses.size());
-
-    ContainerStatus runningContainerStatus = null;
-    ContainerStatus killedQueuedContainerStatus1 = null;
-    ContainerStatus killedQueuedContainerStatus2 = null;
-    for (ContainerStatus cStatus : containerStatuses) {
-      if (ContainerState.RUNNING == cStatus.getState()) {
-        runningContainerStatus = cStatus;
-      }
-      if (ContainerState.COMPLETE == cStatus.getState()) {
-        if (killedQueuedContainerId1.equals(cStatus.getContainerId())) {
-          killedQueuedContainerStatus1 = cStatus;
-        } else {
-          killedQueuedContainerStatus2 = cStatus;
-        }
-      }
-    }
-
-    // Check container IDs and Container Status.
-    Assert.assertNotNull(runningContainerId);
-    Assert.assertNotNull(killedQueuedContainerId1);
-    Assert.assertNotNull(killedQueuedContainerId2);
-
-    // Killed queued container should have ABORTED exit status.
-    Assert.assertEquals(ContainerExitStatus.ABORTED,
-        killedQueuedContainerStatus1.getExitStatus());
-    Assert.assertEquals(ContainerExitStatus.ABORTED,
-        killedQueuedContainerStatus2.getExitStatus());
-
-    // Killed queued container should appear in the recentlyStoppedContainers.
-    Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(
-        killedQueuedContainerId1));
-    Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(
-        killedQueuedContainerId2));
-
-    // Check if killed queued containers are successfully removed from the
-    // queuing context.
-    List<ContainerId> ackedContainers = new ArrayList<ContainerId>();
-    ackedContainers.add(killedQueuedContainerId1);
-    ackedContainers.add(killedQueuedContainerId2);
-
-    nodeStatusUpdater.removeOrTrackCompletedContainersFromContext(
-        ackedContainers);
-
-    containerStatuses = nodeStatusUpdater.getContainerStatuses();
-
-    // Only the running container should be in the container statuses now.
-    Assert.assertEquals(1, containerStatuses.size());
-    Assert.assertEquals(ContainerState.RUNNING,
-        containerStatuses.get(0).getState());
-  }
-
   @Test(timeout = 10000)
   public void testCompletedContainersIsRecentlyStopped() throws Exception {
     NodeManager nm = new NodeManager();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 031300f..7f96947 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
@@ -153,7 +154,7 @@ public abstract class BaseAMRMProxyTest {
    * rest. So the responses returned can be less than the number of end points
    * specified
    * 
-   * @param testContext
+   * @param testContexts
    * @param func
    * @return
    */
@@ -697,11 +698,6 @@ public abstract class BaseAMRMProxyTest {
       return null;
     }
 
-    @Override
-    public QueuingContext getQueuingContext() {
-      return null;
-    }
-
     public boolean isDistributedSchedulingEnabled() {
       return false;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message