hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From su...@apache.org
Subject [16/50] [abbrv] hadoop git commit: YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan
Date Tue, 08 Nov 2016 22:46:21 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java
new file mode 100644
index 0000000..8b4907b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+
+/**
+ * Contexts for a container inside scheduler
+ */
+public class SchedulerContainer<A extends SchedulerApplicationAttempt,
+    N extends SchedulerNode> {
+  private RMContainer rmContainer;
+  private String nodePartition;
+  private A schedulerApplicationAttempt;
+  private N schedulerNode;
+  private boolean allocated; // Allocated (True) or reserved (False)
+
+  public SchedulerContainer(A app, N node, RMContainer rmContainer,
+      String nodePartition, boolean allocated) {
+    this.schedulerApplicationAttempt = app;
+    this.schedulerNode = node;
+    this.rmContainer = rmContainer;
+    this.nodePartition = nodePartition;
+    this.allocated = allocated;
+  }
+
+  public String getNodePartition() {
+    return nodePartition;
+  }
+
+  public RMContainer getRmContainer() {
+    return rmContainer;
+  }
+
+  public A getSchedulerApplicationAttempt() {
+    return schedulerApplicationAttempt;
+  }
+
+  public N getSchedulerNode() {
+    return schedulerNode;
+  }
+
+  public boolean isAllocated() {
+    return allocated;
+  }
+
+  public SchedulerRequestKey getSchedulerRequestKey() {
+    if (rmContainer.getState() == RMContainerState.RESERVED) {
+      return rmContainer.getReservedSchedulerKey();
+    }
+    return rmContainer.getAllocatedSchedulerKey();
+  }
+
+  @Override
+  public String toString() {
+    return "(Application=" + schedulerApplicationAttempt
+        .getApplicationAttemptId() + "; Node=" + schedulerNode.getNodeID()
+        + "; Resource=" + rmContainer.getAllocatedOrReservedResource() + ")";
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index ebe70d4..6d9dda8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -18,14 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -51,12 +44,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
@@ -70,11 +64,25 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCap
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * Represents an application attempt from the viewpoint of the FIFO or Capacity
@@ -101,6 +109,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
    */
   private String appSkipNodeDiagnostics;
 
+  private Map<ContainerId, SchedContainerChangeRequest> toBeRemovedIncRequests =
+      new ConcurrentHashMap<>();
+
   public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, 
       String user, Queue queue, ActiveUsersManager activeUsersManager,
       RMContext rmContext) {
@@ -193,11 +204,11 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     }
   }
 
-  public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
+  public RMContainer allocate(FiCaSchedulerNode node,
       SchedulerRequestKey schedulerKey, ResourceRequest request,
       Container container) {
     try {
-      writeLock.lock();
+      readLock.lock();
 
       if (isStopped) {
         return null;
@@ -216,41 +227,408 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
           request.getNodeLabelExpression());
       ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
 
+      // FIXME, should set when confirmed
       updateAMContainerDiagnostics(AMState.ASSIGNED, null);
 
-      // Add it to allContainers list.
-      newlyAllocatedContainers.add(rmContainer);
+      return rmContainer;
+    } finally {
+      readLock.unlock();
+    }
+  }
 
-      ContainerId containerId = container.getId();
-      liveContainers.put(containerId, rmContainer);
+  private boolean rmContainerInFinalState(RMContainer rmContainer) {
+    if (null == rmContainer) {
+      return false;
+    }
 
-      // Update consumption and track allocations
-      List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
-          type, node, schedulerKey, request, container);
+    return rmContainer.completed();
+  }
 
-      attemptResourceUsage.incUsed(node.getPartition(),
-          container.getResource());
+  private boolean anyContainerInFinalState(
+      ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
+    for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : request
+        .getContainersToRelease()) {
+      if (rmContainerInFinalState(c.getRmContainer())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("To-release container=" + c.getRmContainer()
+              + " is in final state");
+        }
+        return true;
+      }
+    }
 
-      // Update resource requests related to "request" and store in RMContainer
-      ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
+    for (ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> c : request
+        .getContainersToAllocate()) {
+      for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> r : c
+          .getToRelease()) {
+        if (rmContainerInFinalState(r.getRmContainer())) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("To-release container=" + r.getRmContainer()
+                + ", for to a new allocated container, is in final state");
+          }
+          return true;
+        }
+      }
 
-      // Inform the container
-      rmContainer.handle(
-          new RMContainerEvent(containerId, RMContainerEventType.START));
+      if (null != c.getAllocateFromReservedContainer()) {
+        if (rmContainerInFinalState(
+            c.getAllocateFromReservedContainer().getRmContainer())) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Allocate from reserved container" + c
+                .getAllocateFromReservedContainer().getRmContainer()
+                + " is in final state");
+          }
+          return true;
+        }
+      }
+    }
 
+    for (ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> c : request
+        .getContainersToReserve()) {
+      for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> r : c
+          .getToRelease()) {
+        if (rmContainerInFinalState(r.getRmContainer())) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("To-release container=" + r.getRmContainer()
+                + ", for a reserved container, is in final state");
+          }
+          return true;
+        }
+      }
+    }
+
+    return false;
+  }
+
+  private SchedContainerChangeRequest getResourceChangeRequest(
+      SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) {
+    return appSchedulingInfo.getIncreaseRequest(
+        schedulerContainer.getSchedulerNode().getNodeID(),
+        schedulerContainer.getSchedulerRequestKey(),
+        schedulerContainer.getRmContainer().getContainerId());
+  }
+
+  private boolean checkIncreaseContainerAllocation(
+      ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation,
+      SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) {
+    // When increase a container
+    if (schedulerContainer.getRmContainer().getState()
+        != RMContainerState.RUNNING) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("allocate: applicationAttemptId=" + containerId
-            .getApplicationAttemptId() + " container=" + containerId + " host="
-            + container.getNodeId().getHost() + " type=" + type);
+        LOG.debug("Trying to increase a container, but container="
+            + schedulerContainer.getRmContainer().getContainerId()
+            + " is not in running state.");
       }
-      RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
-          "SchedulerApp", getApplicationId(), containerId,
-          container.getResource());
+      return false;
+    }
 
-      return rmContainer;
+    // Check if increase request is still valid
+    SchedContainerChangeRequest increaseRequest = getResourceChangeRequest(
+        schedulerContainer);
+
+    if (null == increaseRequest || !Resources.equals(
+        increaseRequest.getDeltaCapacity(),
+        allocation.getAllocatedOrReservedResource())) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Increase request has been changed, reject this proposal");
+      }
+      return false;
+    }
+
+    if (allocation.getAllocateFromReservedContainer() != null) {
+      // In addition, if allocation is from a reserved container, check
+      // if the reserved container has enough reserved space
+      if (!Resources.equals(
+          allocation.getAllocateFromReservedContainer().getRmContainer()
+              .getReservedResource(), increaseRequest.getDeltaCapacity())) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private boolean commonCheckContainerAllocation(
+      Resource cluster,
+      ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation,
+      SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) {
+    // Make sure node is not reserved by anyone else
+    RMContainer reservedContainerOnNode =
+        schedulerContainer.getSchedulerNode().getReservedContainer();
+    if (reservedContainerOnNode != null) {
+      RMContainer fromReservedContainer =
+          allocation.getAllocateFromReservedContainer().getRmContainer();
+
+      if (fromReservedContainer != reservedContainerOnNode) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Try to allocate from a non-existed reserved container");
+        }
+        return false;
+      }
+    }
+
+    // Do we have enough space on this node?
+    Resource availableResource = Resources.clone(
+        schedulerContainer.getSchedulerNode().getUnallocatedResource());
+
+    // If we have any to-release container in non-reserved state, they are
+    // from lazy-preemption, add their consumption to available resource
+    // of this node
+    if (allocation.getToRelease() != null && !allocation.getToRelease()
+        .isEmpty()) {
+      for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
+          releaseContainer : allocation.getToRelease()) {
+        // Only consider non-reserved container (reserved container will
+        // not affect available resource of node) on the same node
+        if (releaseContainer.getRmContainer().getState()
+            != RMContainerState.RESERVED
+            && releaseContainer.getSchedulerNode() == schedulerContainer
+            .getSchedulerNode()) {
+          Resources.addTo(availableResource,
+              releaseContainer.getRmContainer().getAllocatedResource());
+        }
+      }
+    }
+    if (!Resources.fitsIn(rc, cluster,
+        allocation.getAllocatedOrReservedResource(),
+        availableResource)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Node doesn't have enough available resource, asked="
+            + allocation.getAllocatedOrReservedResource() + " available="
+            + availableResource);
+      }
+      return false;
+    }
+
+    return true;
+  }
+
+  public boolean accept(Resource cluster,
+      ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
+    List<ResourceRequest> resourceRequests = null;
+    boolean reReservation = false;
+
+    try {
+      readLock.lock();
+
+      // First make sure no container in release list in final state
+      if (anyContainerInFinalState(request)) {
+        return false;
+      }
+
+      // TODO, make sure all scheduler nodes are existed
+      // TODO, make sure all node labels are not changed
+
+      if (request.anythingAllocatedOrReserved()) {
+        /*
+         * 1) If this is a newly allocated container, check if the node is reserved
+         *    / not-reserved by any other application
+         * 2) If this is a newly reserved container, check if the node is reserved or not
+         */
+        // Assume we have only one container allocated or reserved
+        ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>
+            allocation = request.getFirstAllocatedOrReservedContainer();
+        SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
+            schedulerContainer = allocation.getAllocatedOrReservedContainer();
+
+        if (schedulerContainer.isAllocated()) {
+          if (!allocation.isIncreasedAllocation()) {
+            // When allocate a new container
+            resourceRequests =
+                schedulerContainer.getRmContainer().getResourceRequests();
+
+            // Check pending resource request
+            if (!appSchedulingInfo.checkAllocation(allocation.getAllocationLocalityType(),
+                schedulerContainer.getSchedulerNode(),
+                schedulerContainer.getSchedulerRequestKey())) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("No pending resource for: nodeType=" + allocation
+                    .getAllocationLocalityType() + ", node=" + schedulerContainer
+                    .getSchedulerNode() + ", requestKey=" + schedulerContainer
+                    .getSchedulerRequestKey() + ", application="
+                    + getApplicationAttemptId());
+              }
+
+              return false;
+            }
+          } else {
+            if (!checkIncreaseContainerAllocation(allocation,
+                schedulerContainer)) {
+              return false;
+            }
+          }
+
+          // Common part of check container allocation regardless if it is a
+          // increase container or regular container
+          commonCheckContainerAllocation(cluster, allocation,
+              schedulerContainer);
+        } else {
+          // Container reserved first time will be NEW, after the container
+          // accepted & confirmed, it will become RESERVED state
+          if (schedulerContainer.getRmContainer().getState()
+              == RMContainerState.RESERVED) {
+            // Set reReservation == true
+            reReservation = true;
+          } else {
+            // When reserve a resource (state == NEW is for new container,
+            // state == RUNNING is for increase container).
+            // Just check if the node is not already reserved by someone
+            if (schedulerContainer.getSchedulerNode().getReservedContainer()
+                != null) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Try to reserve a container, but the node is "
+                    + "already reserved by another container="
+                    + schedulerContainer.getSchedulerNode()
+                    .getReservedContainer().getContainerId());
+              }
+              return false;
+            }
+          }
+        }
+      }
+    } finally {
+      readLock.unlock();
+    }
+
+    // Skip check parent if this is a re-reservation container
+    boolean accepted = true;
+    if (!reReservation) {
+      // Check parent if anything allocated or reserved
+      if (request.anythingAllocatedOrReserved()) {
+        accepted = getCSLeafQueue().accept(cluster, request);
+      }
+    }
+
+    // When rejected, recover resource requests for this app
+    if (!accepted && resourceRequests != null) {
+      recoverResourceRequestsForContainer(resourceRequests);
+    }
+
+    return accepted;
+  }
+
+  public void apply(Resource cluster,
+      ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
+    boolean reReservation = false;
+
+    try {
+      writeLock.lock();
+
+      // If we allocated something
+      if (request.anythingAllocatedOrReserved()) {
+        ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>
+            allocation = request.getFirstAllocatedOrReservedContainer();
+        SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
+            schedulerContainer = allocation.getAllocatedOrReservedContainer();
+        RMContainer rmContainer = schedulerContainer.getRmContainer();
+
+        reReservation =
+            (!schedulerContainer.isAllocated()) && (rmContainer.getState()
+                == RMContainerState.RESERVED);
+
+        // Generate new containerId if it is not an allocation for increasing
+        // Or re-reservation
+        if (!allocation.isIncreasedAllocation()) {
+          if (rmContainer.getContainer().getId() == null) {
+            rmContainer.setContainerId(BuilderUtils
+                .newContainerId(getApplicationAttemptId(),
+                    getNewContainerId()));
+          }
+        }
+        ContainerId containerId = rmContainer.getContainerId();
+
+        if (schedulerContainer.isAllocated()) {
+          // This allocation is from a reserved container
+          // Unreserve it first
+          if (allocation.getAllocateFromReservedContainer() != null) {
+            RMContainer reservedContainer =
+                allocation.getAllocateFromReservedContainer().getRmContainer();
+            // Handling container allocation
+            // Did we previously reserve containers at this 'priority'?
+            unreserve(schedulerContainer.getSchedulerRequestKey(),
+                schedulerContainer.getSchedulerNode(), reservedContainer);
+          }
+
+          // Update this application for the allocated container
+          if (!allocation.isIncreasedAllocation()) {
+            // Allocate a new container
+            newlyAllocatedContainers.add(rmContainer);
+            liveContainers.put(containerId, rmContainer);
+
+            // Deduct pending resource requests
+            List<ResourceRequest> requests = appSchedulingInfo.allocate(
+                allocation.getAllocationLocalityType(), schedulerContainer.getSchedulerNode(),
+                schedulerContainer.getSchedulerRequestKey(),
+                schedulerContainer.getRmContainer().getContainer());
+            ((RMContainerImpl) rmContainer).setResourceRequests(requests);
+
+            attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(),
+                allocation.getAllocatedOrReservedResource());
+
+            rmContainer.handle(
+                new RMContainerEvent(containerId, RMContainerEventType.START));
+
+            // Inform the node
+            schedulerContainer.getSchedulerNode().allocateContainer(
+                rmContainer);
+
+            // update locality statistics,
+            incNumAllocatedContainers(allocation.getAllocationLocalityType(),
+                allocation.getRequestLocalityType());
+
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("allocate: applicationAttemptId=" + containerId
+                  .getApplicationAttemptId() + " container=" + containerId
+                  + " host=" + rmContainer.getAllocatedNode().getHost()
+                  + " type=" + allocation.getAllocationLocalityType());
+            }
+            RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
+                "SchedulerApp", getApplicationId(), containerId,
+                allocation.getAllocatedOrReservedResource());
+          } else{
+            SchedContainerChangeRequest increaseRequest =
+                getResourceChangeRequest(schedulerContainer);
+
+            // allocate resource for an increase request
+            // Notify node
+            schedulerContainer.getSchedulerNode().increaseContainer(
+                increaseRequest.getContainerId(),
+                increaseRequest.getDeltaCapacity());
+
+            // OK, we can allocate this increase request
+            // Notify application
+            increaseContainer(increaseRequest);
+          }
+        } else {
+          if (!allocation.isIncreasedAllocation()) {
+            // If the rmContainer's state is already updated to RESERVED, this is
+            // a reReservation
+            reserve(schedulerContainer.getSchedulerRequestKey(),
+                schedulerContainer.getSchedulerNode(),
+                schedulerContainer.getRmContainer(),
+                schedulerContainer.getRmContainer().getContainer(),
+                reReservation);
+          } else{
+            SchedContainerChangeRequest increaseRequest =
+                getResourceChangeRequest(schedulerContainer);
+
+            reserveIncreasedContainer(
+                schedulerContainer.getSchedulerRequestKey(),
+                schedulerContainer.getSchedulerNode(),
+                increaseRequest.getRMContainer(),
+                increaseRequest.getDeltaCapacity());
+          }
+        }
+      }
     } finally {
       writeLock.unlock();
     }
+
+    // Don't bother CS leaf queue if it is a re-reservation
+    if (!reReservation) {
+      getCSLeafQueue().apply(cluster, request);
+    }
   }
 
   public boolean unreserve(SchedulerRequestKey schedulerKey,
@@ -347,9 +725,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
    * of the resources that will be allocated to and preempted from this
    * application.
    *
-   * @param resourceCalculator
-   * @param clusterResource
-   * @param minimumAllocation
+   * @param resourceCalculator resourceCalculator
+   * @param clusterResource clusterResource
+   * @param minimumAllocation minimumAllocation
    * @return an allocation
    */
   public Allocation getAllocation(ResourceCalculator resourceCalculator,
@@ -386,45 +764,40 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   public NodeId getNodeIdToUnreserve(
       SchedulerRequestKey schedulerKey, Resource resourceNeedUnreserve,
       ResourceCalculator rc, Resource clusterResource) {
-    try {
-      writeLock.lock();
-      // first go around make this algorithm simple and just grab first
-      // reservation that has enough resources
-      Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get(
-          schedulerKey);
-
-      if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
-        for (Map.Entry<NodeId, RMContainer> entry : reservedContainers
-            .entrySet()) {
-          NodeId nodeId = entry.getKey();
-          RMContainer reservedContainer = entry.getValue();
-          if (reservedContainer.hasIncreaseReservation()) {
-            // Currently, only regular container allocation supports continuous
-            // reservation looking, we don't support canceling increase request
-            // reservation when allocating regular container.
-            continue;
-          }
-
-          Resource reservedResource = reservedContainer.getReservedResource();
+    // first go around make this algorithm simple and just grab first
+    // reservation that has enough resources
+    Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get(
+        schedulerKey);
+
+    if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
+      for (Map.Entry<NodeId, RMContainer> entry : reservedContainers
+          .entrySet()) {
+        NodeId nodeId = entry.getKey();
+        RMContainer reservedContainer = entry.getValue();
+        if (reservedContainer.hasIncreaseReservation()) {
+          // Currently, only regular container allocation supports continuous
+          // reservation looking, we don't support canceling increase request
+          // reservation when allocating regular container.
+          continue;
+        }
 
-          // make sure we unreserve one with at least the same amount of
-          // resources, otherwise could affect capacity limits
-          if (Resources.fitsIn(rc, clusterResource, resourceNeedUnreserve,
-              reservedResource)) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(
-                  "unreserving node with reservation size: " + reservedResource
-                      + " in order to allocate container with size: "
-                      + resourceNeedUnreserve);
-            }
-            return nodeId;
+        Resource reservedResource = reservedContainer.getReservedResource();
+
+        // make sure we unreserve one with at least the same amount of
+        // resources, otherwise could affect capacity limits
+        if (Resources.fitsIn(rc, clusterResource, resourceNeedUnreserve,
+            reservedResource)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                "unreserving node with reservation size: " + reservedResource
+                    + " in order to allocate container with size: "
+                    + resourceNeedUnreserve);
           }
+          return nodeId;
         }
       }
-      return null;
-    } finally {
-      writeLock.unlock();
     }
+    return null;
   }
   
   public void setHeadroomProvider(
@@ -482,10 +855,11 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     return false;
   }
 
-  public void reserve(SchedulerRequestKey schedulerKey,
-      FiCaSchedulerNode node, RMContainer rmContainer, Container container) {
+  public void reserve(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node,
+      RMContainer rmContainer, Container container, boolean reReservation) {
     // Update reserved metrics if this is the first reservation
-    if (rmContainer == null) {
+    // rmContainer will be moved to reserved in the super.reserve
+    if (!reReservation) {
       queue.getMetrics().reserveResource(
           getUser(), container.getResource());
     }
@@ -501,35 +875,39 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   public RMContainer findNodeToUnreserve(Resource clusterResource,
       FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
       Resource minimumUnreservedResource) {
-    // need to unreserve some other container first
-    NodeId idToUnreserve =
-        getNodeIdToUnreserve(schedulerKey, minimumUnreservedResource,
-            rc, clusterResource);
-    if (idToUnreserve == null) {
+    try {
+      readLock.lock();
+      // need to unreserve some other container first
+      NodeId idToUnreserve = getNodeIdToUnreserve(schedulerKey,
+          minimumUnreservedResource, rc, clusterResource);
+      if (idToUnreserve == null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("checked to see if could unreserve for app but nothing "
+              + "reserved that matches for this app");
+        }
+        return null;
+      }
+      FiCaSchedulerNode nodeToUnreserve =
+          ((CapacityScheduler) scheduler).getNode(idToUnreserve);
+      if (nodeToUnreserve == null) {
+        LOG.error("node to unreserve doesn't exist, nodeid: " + idToUnreserve);
+        return null;
+      }
       if (LOG.isDebugEnabled()) {
-        LOG.debug("checked to see if could unreserve for app but nothing "
-            + "reserved that matches for this app");
+        LOG.debug("unreserving for app: " + getApplicationId() + " on nodeId: "
+            + idToUnreserve
+            + " in order to replace reserved application and place it on node: "
+            + node.getNodeID() + " needing: " + minimumUnreservedResource);
       }
-      return null;
-    }
-    FiCaSchedulerNode nodeToUnreserve =
-        ((CapacityScheduler) scheduler).getNode(idToUnreserve);
-    if (nodeToUnreserve == null) {
-      LOG.error("node to unreserve doesn't exist, nodeid: " + idToUnreserve);
-      return null;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("unreserving for app: " + getApplicationId()
-        + " on nodeId: " + idToUnreserve
-        + " in order to replace reserved application and place it on node: "
-        + node.getNodeID() + " needing: " + minimumUnreservedResource);
-    }
 
-    // headroom
-    Resources.addTo(getHeadroom(), nodeToUnreserve
-        .getReservedContainer().getReservedResource());
+      // headroom
+      Resources.addTo(getHeadroom(),
+          nodeToUnreserve.getReservedContainer().getReservedResource());
 
-    return nodeToUnreserve.getReservedContainer();
+      return nodeToUnreserve.getReservedContainer();
+    } finally {
+      readLock.unlock();
+    }
   }
 
   public LeafQueue getCSLeafQueue() {
@@ -537,7 +915,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   }
 
   public CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
+      PlacementSet<FiCaSchedulerNode> ps, ResourceLimits currentResourceLimits,
       SchedulingMode schedulingMode, RMContainer reservedContainer) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("pre-assignContainers for application "
@@ -545,13 +923,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       showRequests();
     }
 
-    try {
-      writeLock.lock();
-      return containerAllocator.assignContainers(clusterResource, node,
-          schedulingMode, currentResourceLimits, reservedContainer);
-    } finally {
-      writeLock.unlock();
-    }
+    return containerAllocator.assignContainers(clusterResource, ps,
+        schedulingMode, currentResourceLimits, reservedContainer);
   }
 
   public void nodePartitionUpdated(RMContainer rmContainer, String oldPartition,
@@ -626,13 +999,18 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   /**
    * Set the message temporarily if the reason is known for why scheduling did
    * not happen for a given node, if not message will be over written
-   * @param message
+   * @param message Message of app skip diagnostics
    */
   public void updateAppSkipNodeDiagnostics(String message) {
     this.appSkipNodeDiagnostics = message;
   }
 
   public void updateNodeInfoForAMDiagnostics(FiCaSchedulerNode node) {
+    // FIXME, update AM diagnostics when global scheduling is enabled
+    if (null == node) {
+      return;
+    }
+
     if (isWaitingForAMContainer()) {
       StringBuilder diagnosticMessageBldr = new StringBuilder();
       if (appSkipNodeDiagnostics != null) {
@@ -653,6 +1031,13 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     }
   }
 
+  @Override
+  @SuppressWarnings("unchecked")
+  public SchedulingPlacementSet<FiCaSchedulerNode> getSchedulingPlacementSet(
+      SchedulerRequestKey schedulerRequestKey) {
+    return super.getSchedulingPlacementSet(schedulerRequestKey);
+  }
+
   /**
    * Recalculates the per-app, percent of queue metric, specific to the
    * Capacity Scheduler.
@@ -690,4 +1075,29 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   public ReentrantReadWriteLock.WriteLock getWriteLock() {
     return this.writeLock;
   }
+
+  public void addToBeRemovedIncreaseRequest(
+      SchedContainerChangeRequest request) {
+    toBeRemovedIncRequests.put(request.getContainerId(), request);
+  }
+
+  public void removedToBeRemovedIncreaseRequests() {
+    // Remove invalid in request requests
+    if (!toBeRemovedIncRequests.isEmpty()) {
+      try {
+        writeLock.lock();
+        Iterator<Map.Entry<ContainerId, SchedContainerChangeRequest>> iter =
+            toBeRemovedIncRequests.entrySet().iterator();
+        while (iter.hasNext()) {
+          SchedContainerChangeRequest req = iter.next().getValue();
+          appSchedulingInfo.removeIncreaseRequest(req.getNodeId(),
+              req.getRMContainer().getAllocatedSchedulerKey(),
+              req.getContainerId());
+          iter.remove();
+        }
+      } finally {
+        writeLock.unlock();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
new file mode 100644
index 0000000..d275bfd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+
+import java.util.List;
+
+public class FifoAppAttempt extends FiCaSchedulerApp {
+  private static final Log LOG = LogFactory.getLog(FifoAppAttempt.class);
+
+  FifoAppAttempt(ApplicationAttemptId appAttemptId, String user,
+      Queue queue, ActiveUsersManager activeUsersManager,
+      RMContext rmContext) {
+    super(appAttemptId, user, queue, activeUsersManager, rmContext);
+  }
+
+  public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
+      SchedulerRequestKey schedulerKey, ResourceRequest request,
+      Container container) {
+    try {
+      writeLock.lock();
+
+      if (isStopped) {
+        return null;
+      }
+
+      // Required sanity check - AM can call 'allocate' to update resource
+      // request without locking the scheduler, hence we need to check
+      if (getTotalRequiredResources(schedulerKey) <= 0) {
+        return null;
+      }
+
+      // Create RMContainer
+      RMContainer rmContainer = new RMContainerImpl(container,
+          this.getApplicationAttemptId(), node.getNodeID(),
+          appSchedulingInfo.getUser(), this.rmContext,
+          request.getNodeLabelExpression());
+      ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
+
+      updateAMContainerDiagnostics(AMState.ASSIGNED, null);
+
+      // Add it to allContainers list.
+      newlyAllocatedContainers.add(rmContainer);
+
+      ContainerId containerId = container.getId();
+      liveContainers.put(containerId, rmContainer);
+
+      // Update consumption and track allocations
+      List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+          type, node, schedulerKey, request, container);
+
+      attemptResourceUsage.incUsed(node.getPartition(),
+          container.getResource());
+
+      // Update resource requests related to "request" and store in RMContainer
+      ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
+
+      // Inform the container
+      rmContainer.handle(
+          new RMContainerEvent(containerId, RMContainerEventType.START));
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("allocate: applicationAttemptId=" + containerId
+            .getApplicationAttemptId() + " container=" + containerId + " host="
+            + container.getNodeId().getHost() + " type=" + type);
+      }
+      RMAuditLogger.logSuccess(getUser(),
+          RMAuditLogger.AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
+          getApplicationId(), containerId, container.getResource());
+
+      return rmContainer;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 92acf75..5ccde19 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -79,7 +79,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -103,7 +102,7 @@ import com.google.common.annotations.VisibleForTesting;
 @Evolving
 @SuppressWarnings("unchecked")
 public class FifoScheduler extends
-    AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements
+    AbstractYarnScheduler<FifoAppAttempt, FiCaSchedulerNode> implements
     Configurable {
 
   private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
@@ -239,7 +238,7 @@ public class FifoScheduler extends
     validateConf(conf);
     //Use ConcurrentSkipListMap because applications need to be ordered
     this.applications =
-        new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
+        new ConcurrentSkipListMap<>();
     this.minimumAllocation =
         Resources.createResource(conf.getInt(
             YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
@@ -328,7 +327,7 @@ public class FifoScheduler extends
       List<String> blacklistAdditions, List<String> blacklistRemovals,
       List<UpdateContainerRequest> increaseRequests,
       List<UpdateContainerRequest> decreaseRequests) {
-    FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
+    FifoAppAttempt application = getApplicationAttempt(applicationAttemptId);
     if (application == null) {
       LOG.error("Calling allocate on removed " +
           "or non existant application " + applicationAttemptId);
@@ -384,8 +383,8 @@ public class FifoScheduler extends
   @VisibleForTesting
   public synchronized void addApplication(ApplicationId applicationId,
       String queue, String user, boolean isAppRecovering) {
-    SchedulerApplication<FiCaSchedulerApp> application =
-        new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
+    SchedulerApplication<FifoAppAttempt> application =
+        new SchedulerApplication<>(DEFAULT_QUEUE, user);
     applications.put(applicationId, application);
     metrics.submitApp(user);
     LOG.info("Accepted application " + applicationId + " from user: " + user
@@ -405,12 +404,12 @@ public class FifoScheduler extends
       addApplicationAttempt(ApplicationAttemptId appAttemptId,
           boolean transferStateFromPreviousAttempt,
           boolean isAttemptRecovering) {
-    SchedulerApplication<FiCaSchedulerApp> application =
+    SchedulerApplication<FifoAppAttempt> application =
         applications.get(appAttemptId.getApplicationId());
     String user = application.getUser();
     // TODO: Fix store
-    FiCaSchedulerApp schedulerApp =
-        new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
+    FifoAppAttempt schedulerApp =
+        new FifoAppAttempt(appAttemptId, user, DEFAULT_QUEUE,
           activeUsersManager, this.rmContext);
 
     if (transferStateFromPreviousAttempt) {
@@ -436,7 +435,7 @@ public class FifoScheduler extends
 
   private synchronized void doneApplication(ApplicationId applicationId,
       RMAppState finalState) {
-    SchedulerApplication<FiCaSchedulerApp> application =
+    SchedulerApplication<FifoAppAttempt> application =
         applications.get(applicationId);
     if (application == null){
       LOG.warn("Couldn't find application " + applicationId);
@@ -454,8 +453,8 @@ public class FifoScheduler extends
       ApplicationAttemptId applicationAttemptId,
       RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers)
       throws IOException {
-    FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
-    SchedulerApplication<FiCaSchedulerApp> application =
+    FifoAppAttempt attempt = getApplicationAttempt(applicationAttemptId);
+    SchedulerApplication<FifoAppAttempt> application =
         applications.get(applicationAttemptId.getApplicationId());
     if (application == null || attempt == null) {
       throw new IOException("Unknown application " + applicationAttemptId + 
@@ -492,9 +491,9 @@ public class FifoScheduler extends
         " #applications=" + applications.size());
 
     // Try to assign containers to applications in fifo order
-    for (Map.Entry<ApplicationId, SchedulerApplication<FiCaSchedulerApp>> e : applications
+    for (Map.Entry<ApplicationId, SchedulerApplication<FifoAppAttempt>> e : applications
         .entrySet()) {
-      FiCaSchedulerApp application = e.getValue().getCurrentAppAttempt();
+      FifoAppAttempt application = e.getValue().getCurrentAppAttempt();
       if (application == null) {
         continue;
       }
@@ -536,9 +535,9 @@ public class FifoScheduler extends
 
     // Update the applications' headroom to correctly take into
     // account the containers assigned in this update.
-    for (SchedulerApplication<FiCaSchedulerApp> application : applications.values()) {
-      FiCaSchedulerApp attempt =
-          (FiCaSchedulerApp) application.getCurrentAppAttempt();
+    for (SchedulerApplication<FifoAppAttempt> application : applications.values()) {
+      FifoAppAttempt attempt =
+          (FifoAppAttempt) application.getCurrentAppAttempt();
       if (attempt == null) {
         continue;
       }
@@ -546,7 +545,7 @@ public class FifoScheduler extends
     }
   }
 
-  private int getMaxAllocatableContainers(FiCaSchedulerApp application,
+  private int getMaxAllocatableContainers(FifoAppAttempt application,
       SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, NodeType type) {
     int maxContainers = 0;
 
@@ -585,7 +584,7 @@ public class FifoScheduler extends
 
 
   private int assignContainersOnNode(FiCaSchedulerNode node, 
-      FiCaSchedulerApp application, SchedulerRequestKey schedulerKey
+      FifoAppAttempt application, SchedulerRequestKey schedulerKey
   ) {
     // Data-local
     int nodeLocalContainers =
@@ -612,7 +611,7 @@ public class FifoScheduler extends
   }
 
   private int assignNodeLocalContainers(FiCaSchedulerNode node, 
-      FiCaSchedulerApp application, SchedulerRequestKey schedulerKey) {
+      FifoAppAttempt application, SchedulerRequestKey schedulerKey) {
     int assignedContainers = 0;
     ResourceRequest request =
         application.getResourceRequest(schedulerKey, node.getNodeName());
@@ -638,7 +637,7 @@ public class FifoScheduler extends
   }
 
   private int assignRackLocalContainers(FiCaSchedulerNode node, 
-      FiCaSchedulerApp application, SchedulerRequestKey schedulerKey) {
+      FifoAppAttempt application, SchedulerRequestKey schedulerKey) {
     int assignedContainers = 0;
     ResourceRequest request =
         application.getResourceRequest(schedulerKey, node.getRMNode()
@@ -664,7 +663,7 @@ public class FifoScheduler extends
   }
 
   private int assignOffSwitchContainers(FiCaSchedulerNode node, 
-      FiCaSchedulerApp application, SchedulerRequestKey schedulerKey) {
+      FifoAppAttempt application, SchedulerRequestKey schedulerKey) {
     int assignedContainers = 0;
     ResourceRequest request =
         application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
@@ -676,7 +675,7 @@ public class FifoScheduler extends
     return assignedContainers;
   }
 
-  private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application,
+  private int assignContainer(FiCaSchedulerNode node, FifoAppAttempt application,
       SchedulerRequestKey schedulerKey, int assignableContainers,
       ResourceRequest request, NodeType type) {
     LOG.debug("assignContainers:" +
@@ -710,8 +709,8 @@ public class FifoScheduler extends
         // Allocate!
         
         // Inform the application
-        RMContainer rmContainer =
-            application.allocate(type, node, schedulerKey, request, container);
+        RMContainer rmContainer = application.allocate(type, node, schedulerKey,
+            request, container);
         
         // Inform the node
         node.allocateContainer(rmContainer);
@@ -836,7 +835,7 @@ public class FifoScheduler extends
 
     // Get the application for the finished container
     Container container = rmContainer.getContainer();
-    FiCaSchedulerApp application =
+    FifoAppAttempt application =
         getCurrentAttemptForContainer(container.getId());
     ApplicationId appId =
         container.getId().getApplicationAttemptId().getApplicationId();
@@ -916,7 +915,7 @@ public class FifoScheduler extends
 
   @Override
   public RMContainer getRMContainer(ContainerId containerId) {
-    FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
+    FifoAppAttempt attempt = getCurrentAttemptForContainer(containerId);
     return (attempt == null) ? null : attempt.getRMContainer(containerId);
   }
 
@@ -937,7 +936,7 @@ public class FifoScheduler extends
     if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
       List<ApplicationAttemptId> attempts =
           new ArrayList<ApplicationAttemptId>(applications.size());
-      for (SchedulerApplication<FiCaSchedulerApp> app : applications.values()) {
+      for (SchedulerApplication<FifoAppAttempt> app : applications.values()) {
         attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId());
       }
       return attempts;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java
new file mode 100644
index 0000000..2e6c3ca
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * <p>
+ * PlacementSet is the central place that decide the order of node to fit
+ * asks by application.
+ * </p>
+ *
+ * <p>
+ * Also, PlacementSet can cache results (for example, ordered list) for
+ * better performance.
+ * </p>
+ *
+ * <p>
+ * PlacementSet can depend on one or more other PlacementSets.
+ * </p>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface PlacementSet<N extends SchedulerNode> {
+  /**
+   * Get all nodes for this PlacementSet
+   * @return all nodes for this PlacementSet
+   */
+  Map<NodeId, N> getAllNodes();
+
+  /**
+   * Version of the PlacementSet, can help other PlacementSet with dependencies
+   * deciding if update is required
+   * @return version
+   */
+  long getVersion();
+
+  /**
+   * Partition of the PlacementSet.
+   * @return node partition
+   */
+  String getPartition();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java
new file mode 100644
index 0000000..405122b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+public class PlacementSetUtils {
+  /*
+   * If the {@link PlacementSet} only has one entry, return it. otherwise
+   * return null
+   */
+  public static <N extends SchedulerNode> N getSingleNode(PlacementSet<N> ps) {
+    N node = null;
+    if (1 == ps.getAllNodes().size()) {
+      node = ps.getAllNodes().values().iterator().next();
+    }
+
+    return node;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceRequestUpdateResult.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceRequestUpdateResult.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceRequestUpdateResult.java
new file mode 100644
index 0000000..da356f5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceRequestUpdateResult.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+
+/**
+ * Result of ResourceRequest update
+ */
+public class ResourceRequestUpdateResult {
+  private final ResourceRequest lastAnyResourceRequest;
+  private final ResourceRequest newResourceRequest;
+
+  public ResourceRequestUpdateResult(ResourceRequest lastAnyResourceRequest,
+      ResourceRequest newResourceRequest) {
+    this.lastAnyResourceRequest = lastAnyResourceRequest;
+    this.newResourceRequest = newResourceRequest;
+  }
+
+  public ResourceRequest getLastAnyResourceRequest() {
+    return lastAnyResourceRequest;
+  }
+
+  public ResourceRequest getNewResourceRequest() {
+    return newResourceRequest;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
new file mode 100644
index 0000000..f87f764
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * <p>
+ * In addition to {@link PlacementSet}, this also maintains
+ * pending ResourceRequests:
+ * - When new ResourceRequest(s) added to scheduler, or,
+ * - Or new container allocated, scheduler can notify corresponding
+ * PlacementSet.
+ * </p>
+ *
+ * <p>
+ * Different set of resource requests (E.g., resource requests with the
+ * same schedulerKey) can have one instance of PlacementSet, each PlacementSet
+ * can have different ways to order nodes depends on requests.
+ * </p>
+ */
+public interface SchedulingPlacementSet<N extends SchedulerNode>
+    extends PlacementSet<N> {
+  /**
+   * Get iterator of preferred node depends on requirement and/or availability
+   * @param clusterPlacementSet input cluster PlacementSet
+   * @return iterator of preferred node
+   */
+  Iterator<N> getPreferredNodeIterator(PlacementSet<N> clusterPlacementSet);
+
+  /**
+   * Replace existing ResourceRequest by the new requests
+   *
+   * @param requests new ResourceRequests
+   * @param recoverPreemptedRequestForAContainer if we're recovering resource
+   * requests for preempted container
+   * @return true if total pending resource changed
+   */
+  ResourceRequestUpdateResult updateResourceRequests(
+      List<ResourceRequest> requests,
+      boolean recoverPreemptedRequestForAContainer);
+
+  /**
+   * Get pending ResourceRequests by given schedulerRequestKey
+   * @return Map of resourceName to ResourceRequest
+   */
+  Map<String, ResourceRequest> getResourceRequests();
+
+  /**
+   * Get ResourceRequest by given schedulerKey and resourceName
+   * @param resourceName resourceName
+   * @param schedulerRequestKey schedulerRequestKey
+   * @return ResourceRequest
+   */
+  ResourceRequest getResourceRequest(String resourceName,
+      SchedulerRequestKey schedulerRequestKey);
+
+  /**
+   * Notify container allocated.
+   * @param type Type of the allocation
+   * @param node Which node this container allocated on
+   * @param request resource request
+   * @return list of ResourceRequests deducted
+   */
+  List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
+      ResourceRequest request);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java
new file mode 100644
index 0000000..48efaa1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A simple PlacementSet which keeps an unordered map
+ */
+public class SimplePlacementSet<N extends SchedulerNode>
+    implements PlacementSet<N> {
+
+  private Map<NodeId, N> map;
+  private String partition;
+
+  public SimplePlacementSet(N node) {
+    if (null != node) {
+      // Only one node in the initial PlacementSet
+      this.map = ImmutableMap.of(node.getNodeID(), node);
+      this.partition = node.getPartition();
+    } else {
+      this.map = Collections.emptyMap();
+      this.partition = NodeLabel.DEFAULT_NODE_LABEL_PARTITION;
+    }
+  }
+
+  public SimplePlacementSet(Map<NodeId, N> map, String partition) {
+    this.map = map;
+    this.partition = partition;
+  }
+
+  @Override
+  public Map<NodeId, N> getAllNodes() {
+    return map;
+  }
+
+  @Override
+  public long getVersion() {
+    return 0L;
+  }
+
+  @Override
+  public String getPartition() {
+    return partition;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
index 7bec03a..b7cb1bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
 
 import java.util.*;
+import java.util.concurrent.ConcurrentSkipListSet;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -35,7 +37,7 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
   
   private static final Log LOG = LogFactory.getLog(OrderingPolicy.class);
                                             
-  protected TreeSet<S> schedulableEntities;
+  protected ConcurrentSkipListSet<S> schedulableEntities;
   protected Comparator<SchedulableEntity> comparator;
   protected Map<String, S> entitiesToReorder = new HashMap<String, S>();
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java
index 3cfcd7a..3371df8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
 
 import java.util.*;
+import java.util.concurrent.ConcurrentSkipListSet;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -61,7 +62,7 @@ public class FairOrderingPolicy<S extends SchedulableEntity> extends AbstractCom
       comparators
       );
     this.comparator = fairComparator;
-    this.schedulableEntities = new TreeSet<S>(comparator);
+    this.schedulableEntities = new ConcurrentSkipListSet<S>(comparator);
   }
 
   private double getMagnitude(SchedulableEntity r) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java
index 10f8eeb..2d066bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
 
 import java.util.*;
+import java.util.concurrent.ConcurrentSkipListSet;
+
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 
 /**
@@ -32,7 +34,7 @@ public class FifoOrderingPolicy<S extends SchedulableEntity> extends AbstractCom
     comparators.add(new PriorityComparator());
     comparators.add(new FifoComparator());
     this.comparator = new CompoundComparator(comparators);
-    this.schedulableEntities = new TreeSet<S>(comparator);
+    this.schedulableEntities = new ConcurrentSkipListSet<S>(comparator);
 
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java
index 0891289..6ced9e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
 import java.util.*;
 
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import java.util.concurrent.ConcurrentSkipListSet;
 
 /**
  * This ordering policy is used for pending applications only.
@@ -46,7 +47,7 @@ public class FifoOrderingPolicyForPendingApps<S extends SchedulableEntity>
     comparators.add(new PriorityComparator());
     comparators.add(new FifoComparator());
     this.comparator = new CompoundComparator(comparators);
-    this.schedulableEntities = new TreeSet<S>(comparator);
+    this.schedulableEntities = new ConcurrentSkipListSet<S>(comparator);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 58bb721..3861624 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 
@@ -167,6 +168,28 @@ public class MockRM extends ResourceManager {
     }
   }
 
+  private void waitForState(ApplicationId appId, EnumSet<RMAppState> finalStates)
+      throws InterruptedException {
+    RMApp app = getRMContext().getRMApps().get(appId);
+    Assert.assertNotNull("app shouldn't be null", app);
+    final int timeoutMsecs = 80 * SECOND;
+    int timeWaiting = 0;
+    while (!finalStates.contains(app.getState())) {
+      if (timeWaiting >= timeoutMsecs) {
+        break;
+      }
+
+      LOG.info("App : " + appId + " State is : " + app.getState() +
+          " Waiting for state : " + finalStates);
+      Thread.sleep(WAIT_MS_PER_LOOP);
+      timeWaiting += WAIT_MS_PER_LOOP;
+    }
+
+    LOG.info("App State is : " + app.getState());
+    Assert.assertTrue("App State is not correct (timeout).",
+        finalStates.contains(app.getState()));
+  }
+
   /**
    * Wait until an application has reached a specified state.
    * The timeout is 80 seconds.
@@ -254,7 +277,7 @@ public class MockRM extends ResourceManager {
       RMAppAttemptState finalState, int timeoutMsecs)
       throws InterruptedException {
     int timeWaiting = 0;
-    while (!finalState.equals(attempt.getAppAttemptState())) {
+    while (finalState != attempt.getAppAttemptState()) {
       if (timeWaiting >= timeoutMsecs) {
         break;
       }
@@ -267,7 +290,7 @@ public class MockRM extends ResourceManager {
 
     LOG.info("Attempt State is : " + attempt.getAppAttemptState());
     Assert.assertEquals("Attempt state is not correct (timeout).", finalState,
-      attempt.getState());
+        attempt.getState());
   }
 
   public void waitForContainerToComplete(RMAppAttempt attempt,
@@ -966,6 +989,26 @@ public class MockRM extends ResourceManager {
         rm.getResourceScheduler()).getApplicationAttempt(attemptId));
   }
 
+  public static MockAM launchAMWhenAsyncSchedulingEnabled(RMApp app, MockRM rm)
+      throws Exception {
+    int i = 0;
+    while (app.getCurrentAppAttempt() == null) {
+      if (i < 100) {
+        i++;
+      }
+      Thread.sleep(50);
+    }
+
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+
+    rm.waitForState(attempt.getAppAttemptId(),
+        RMAppAttemptState.ALLOCATED);
+    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+    rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
+
+    return am;
+  }
+
   /**
    * NOTE: nm.nodeHeartbeat is explicitly invoked,
    * don't invoke it before calling launchAM

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index 884e236..e48d9d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -1091,7 +1091,7 @@ public class TestClientRMService {
         rmContext.getScheduler().getSchedulerAppInfo(attemptId)
             .getLiveContainers()).thenReturn(rmContainers);
     ContainerStatus cs = mock(ContainerStatus.class);
-    when(containerimpl.getFinishedStatus()).thenReturn(cs);
+    when(containerimpl.completed()).thenReturn(false);
     when(containerimpl.getDiagnosticsInfo()).thenReturn("N/A");
     when(containerimpl.getContainerExitStatus()).thenReturn(0);
     when(containerimpl.getContainerState()).thenReturn(ContainerState.COMPLETE);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java
index 56d38fb..83a354d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java
@@ -238,8 +238,10 @@ public class TestSchedulerHealth {
     SchedulerHealth sh =
         ((CapacityScheduler) resourceManager.getResourceScheduler())
           .getSchedulerHealth();
-    Assert.assertEquals(2, sh.getAllocationCount().longValue());
-    Assert.assertEquals(Resource.newInstance(3 * 1024, 2),
+    // Now SchedulerHealth records last container allocated, aggregated
+    // allocation account will not be changed
+    Assert.assertEquals(1, sh.getAllocationCount().longValue());
+    Assert.assertEquals(Resource.newInstance(1 * 1024, 1),
       sh.getResourcesAllocated());
     Assert.assertEquals(2, sh.getAggregateAllocationCount().longValue());
     Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 865449f..0aeedce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -134,6 +134,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@@ -3453,7 +3454,7 @@ public class TestCapacityScheduler {
     scheduler.handle(new NodeRemovedSchedulerEvent(
         rm.getRMContext().getRMNodes().get(nm2.getNodeId())));
     // schedulerNode is removed, try allocate a container
-    scheduler.allocateContainersToNode(node);
+    scheduler.allocateContainersToNode(new SimplePlacementSet<>(node), true);
 
     AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
         new AppAttemptRemovedSchedulerEvent(
@@ -3699,4 +3700,57 @@ public class TestCapacityScheduler {
     cs.handle(addAttemptEvent1);
     return appAttemptId1;
   }
+
+  @Test
+  public void testAppAttemptLocalityStatistics() throws Exception {
+    Configuration conf =
+        TestUtils.getConfigurationWithMultipleQueues(new Configuration(false));
+    conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+
+    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+
+    MockRM rm = new MockRM(conf) {
+      protected RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm.start();
+    MockNM nm1 =
+        new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService());
+    nm1.registerNode();
+
+    // Launch app1 in queue=a1
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a");
+
+    // Got one offswitch request and offswitch allocation
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+    // am1 asks for 1 GB resource on h1/default-rack/offswitch
+    am1.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(1), "*",
+            Resources.createResource(1 * GB), 2), ResourceRequest
+        .newInstance(Priority.newInstance(1), "/default-rack",
+            Resources.createResource(1 * GB), 2), ResourceRequest
+        .newInstance(Priority.newInstance(1), "h1",
+            Resources.createResource(1 * GB), 1)), null);
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
+
+    // Got one nodelocal request and nodelocal allocation
+    cs.nodeUpdate(rm.getRMContext().getRMNodes().get(nm1.getNodeId()));
+
+    // Got one nodelocal request and racklocal allocation
+    cs.nodeUpdate(rm.getRMContext().getRMNodes().get(nm1.getNodeId()));
+
+    RMAppAttemptMetrics attemptMetrics = rm.getRMContext().getRMApps().get(
+        app1.getApplicationId()).getCurrentAppAttempt()
+        .getRMAppAttemptMetrics();
+
+    // We should get one node-local allocation, one rack-local allocation
+    // And one off-switch allocation
+    Assert.assertArrayEquals(new int[][] { { 1, 0, 0 }, { 0, 1, 0 }, { 0, 0, 1 } },
+        attemptMetrics.getLocalityStatistics());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message