hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject [15/15] hadoop git commit: YARN-5938. Refactoring OpportunisticContainerAllocator to use SchedulerRequestKey instead of Priority and other misc fixes (asuresh)
Date Fri, 06 Jan 2017 19:34:45 GMT
YARN-5938. Refactoring OpportunisticContainerAllocator to use SchedulerRequestKey instead of Priority and other misc fixes (asuresh)

(cherry picked from commit ac1e5d4f77e3b9df8dcacb0b1f72eecc27931eb8)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/81da7d1d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/81da7d1d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/81da7d1d

Branch: refs/heads/branch-2
Commit: 81da7d1d3084f8f6cc004193e351fc678435705c
Parents: 2b4d3e8
Author: Arun Suresh <asuresh@apache.org>
Authored: Tue Dec 27 11:54:57 2016 -0800
Committer: Arun Suresh <asuresh@apache.org>
Committed: Fri Jan 6 11:25:49 2017 -0800

----------------------------------------------------------------------
 .../api/records/UpdateContainerRequest.java     |  11 +
 .../impl/pb/AllocateResponsePBImpl.java         |   4 +-
 .../server/api/protocolrecords/RemoteNode.java  |   7 +
 .../OpportunisticContainerAllocator.java        |  61 +--
 .../OpportunisticContainerContext.java          |  38 +-
 .../server/scheduler/SchedulerRequestKey.java   | 130 ++++++
 .../scheduler/DistributedScheduler.java         |  11 +-
 .../ApplicationMasterService.java               | 410 ++++++++++---------
 ...pportunisticContainerAllocatorAMService.java |  45 +-
 .../rmcontainer/RMContainer.java                |   3 +-
 .../rmcontainer/RMContainerImpl.java            |   2 +-
 .../rmcontainer/RMContainerReservedEvent.java   |   2 +-
 .../scheduler/AppSchedulingInfo.java            |   2 +
 .../scheduler/SchedulerApplicationAttempt.java  |   4 +-
 .../scheduler/SchedulerNode.java                |   1 +
 .../scheduler/SchedulerRequestKey.java          | 122 ------
 .../scheduler/capacity/LeafQueue.java           |   7 +-
 .../allocator/IncreaseContainerAllocator.java   |   2 +-
 .../allocator/RegularContainerAllocator.java    |   2 +-
 .../scheduler/common/SchedulerContainer.java    |   2 +-
 .../scheduler/common/fica/FiCaSchedulerApp.java |   3 +-
 .../common/fica/FiCaSchedulerNode.java          |   2 +-
 .../distributed/NodeQueueLoadMonitor.java       |  16 +-
 .../scheduler/fair/FSAppAttempt.java            |   2 +-
 .../scheduler/fair/FSSchedulerNode.java         |   2 +-
 .../scheduler/fifo/FifoAppAttempt.java          |   4 +-
 .../scheduler/fifo/FifoScheduler.java           |   4 +-
 .../LocalitySchedulingPlacementSet.java         |  16 +-
 .../placement/SchedulingPlacementSet.java       |   2 +-
 .../server/resourcemanager/Application.java     |   2 +-
 .../yarn/server/resourcemanager/Task.java       |   2 +-
 ...alCapacityPreemptionPolicyMockFramework.java |   3 +-
 ...estProportionalCapacityPreemptionPolicy.java |   3 +-
 .../scheduler/TestAppSchedulingInfo.java        |   1 +
 .../TestSchedulerApplicationAttempt.java        |   2 +
 .../capacity/TestCapacityScheduler.java         |   2 +-
 .../scheduler/capacity/TestLeafQueue.java       |   2 +-
 .../scheduler/capacity/TestReservations.java    |   2 +-
 .../scheduler/capacity/TestUtils.java           |   2 +-
 .../fair/TestContinuousScheduling.java          |   2 +-
 .../scheduler/fair/TestFSAppAttempt.java        |   2 +-
 .../fair/TestFairSchedulerPreemption.java       |   3 +-
 42 files changed, 502 insertions(+), 443 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java
index 200dea3..e4f7a82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java
@@ -159,6 +159,17 @@ public abstract class UpdateContainerRequest extends AbstractResourceRequest {
   }
 
   @Override
+  public String toString() {
+    return "UpdateReq{" +
+        "containerId=" + getContainerId() + ", " +
+        "containerVersion=" + getContainerVersion() + ", " +
+        "targetExecType=" + getExecutionType() + ", " +
+        "targetCapability=" + getCapability() + ", " +
+        "updateType=" + getContainerUpdateType() + ", " +
+        "}";
+  }
+
+  @Override
   public boolean equals(Object obj) {
     if (this == obj) {
       return true;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
index bb50671..b424839 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
@@ -282,8 +282,8 @@ public class AllocateResponsePBImpl extends AllocateResponse {
       final List<Container> containers) {
     if (containers == null)
       return;
-    // this looks like a bug because it results in append and not set
     initLocalNewContainerList();
+    allocatedContainers.clear();
     allocatedContainers.addAll(containers);
   }
 
@@ -299,6 +299,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
     if (containers == null)
       return;
     initLocalUpdatedContainerList();
+    updatedContainers.clear();
     updatedContainers.addAll(containers);
   }
 
@@ -315,6 +316,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
     if (containers == null)
       return;
     initLocalFinishedContainerList();
+    completedContainersStatuses.clear();
     completedContainersStatuses.addAll(containers);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
index 2b76257..e403a12 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
@@ -87,4 +87,11 @@ public abstract class RemoteNode implements Comparable<RemoteNode> {
   public int compareTo(RemoteNode other) {
     return this.getNodeId().compareTo(other.getNodeId());
   }
+
+  @Override
+  public String toString() {
+    return "RemoteNode{" +
+        "nodeId=" + getNodeId() + ", " +
+        "httpAddress=" + getHttpAddress() + "}";
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index 16436bd..c1300b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -22,8 +22,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@@ -192,7 +199,8 @@ public class OpportunisticContainerAllocator {
 
   /**
    * Allocate OPPORTUNISTIC containers.
-   * @param request AllocateRequest
+   * @param blackList Resource BlackList Request
+   * @param oppResourceReqs Opportunistic Resource Requests
    * @param applicationAttemptId ApplicationAttemptId
    * @param opportContext App specific OpportunisticContainerContext
    * @param rmIdentifier RM Identifier
@@ -200,32 +208,24 @@ public class OpportunisticContainerAllocator {
    * @return List of Containers.
    * @throws YarnException YarnException
    */
-  public List<Container> allocateContainers(
-      AllocateRequest request, ApplicationAttemptId applicationAttemptId,
+  public List<Container> allocateContainers(ResourceBlacklistRequest blackList,
+      List<ResourceRequest> oppResourceReqs,
+      ApplicationAttemptId applicationAttemptId,
       OpportunisticContainerContext opportContext, long rmIdentifier,
       String appSubmitter) throws YarnException {
-    // Update released containers.
-    List<ContainerId> releasedContainers = request.getReleaseList();
-    int numReleasedContainers = releasedContainers.size();
-    if (numReleasedContainers > 0) {
-      LOG.info("AttemptID: " + applicationAttemptId + " released: "
-          + numReleasedContainers);
-      opportContext.getContainersAllocated().removeAll(releasedContainers);
-    }
 
     // Update black list.
-    ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest();
-    if (rbr != null) {
-      opportContext.getBlacklist().removeAll(rbr.getBlacklistRemovals());
-      opportContext.getBlacklist().addAll(rbr.getBlacklistAdditions());
+    if (blackList != null) {
+      opportContext.getBlacklist().removeAll(blackList.getBlacklistRemovals());
+      opportContext.getBlacklist().addAll(blackList.getBlacklistAdditions());
     }
 
     // Add OPPORTUNISTIC requests to the outstanding ones.
-    opportContext.addToOutstandingReqs(request.getAskList());
+    opportContext.addToOutstandingReqs(oppResourceReqs);
 
     // Satisfy the outstanding OPPORTUNISTIC requests.
     List<Container> allocatedContainers = new ArrayList<>();
-    for (Priority priority :
+    for (SchedulerRequestKey schedulerKey :
         opportContext.getOutstandingOpReqs().descendingKeySet()) {
       // Allocated containers :
       //  Key = Requested Capability,
@@ -234,7 +234,7 @@ public class OpportunisticContainerAllocator {
       //          we need the requested capability (key) to match against
       //          the outstanding reqs)
       Map<Resource, List<Container>> allocated = allocate(rmIdentifier,
-          opportContext, priority, applicationAttemptId, appSubmitter);
+          opportContext, schedulerKey, applicationAttemptId, appSubmitter);
       for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
         opportContext.matchAllocationToOutstandingRequest(
             e.getKey(), e.getValue());
@@ -246,19 +246,22 @@ public class OpportunisticContainerAllocator {
   }
 
   private Map<Resource, List<Container>> allocate(long rmIdentifier,
-      OpportunisticContainerContext appContext, Priority priority,
+      OpportunisticContainerContext appContext, SchedulerRequestKey schedKey,
       ApplicationAttemptId appAttId, String userName) throws YarnException {
     Map<Resource, List<Container>> containers = new HashMap<>();
     for (ResourceRequest anyAsk :
-        appContext.getOutstandingOpReqs().get(priority).values()) {
+        appContext.getOutstandingOpReqs().get(schedKey).values()) {
       allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
           appContext.getContainerIdGenerator(), appContext.getBlacklist(),
           appAttId, appContext.getNodeMap(), userName, containers, anyAsk);
-      LOG.info("Opportunistic allocation requested for ["
-          + "priority=" + anyAsk.getPriority()
-          + ", num_containers=" + anyAsk.getNumContainers()
-          + ", capability=" + anyAsk.getCapability() + "]"
-          + " allocated = " + containers.get(anyAsk.getCapability()).size());
+      if (!containers.isEmpty()) {
+        LOG.info("Opportunistic allocation requested for ["
+            + "priority=" + anyAsk.getPriority()
+            + ", allocationRequestId=" + anyAsk.getAllocationRequestId()
+            + ", num_containers=" + anyAsk.getNumContainers()
+            + ", capability=" + anyAsk.getCapability() + "]"
+            + " allocated = " + containers.keySet());
+      }
     }
     return containers;
   }
@@ -282,7 +285,9 @@ public class OpportunisticContainerAllocator {
       nodesForScheduling.add(nodeEntry.getValue());
     }
     if (nodesForScheduling.isEmpty()) {
-      LOG.warn("No nodes available for allocating opportunistic containers.");
+      LOG.warn("No nodes available for allocating opportunistic containers. [" +
+          "allNodes=" + allNodes + ", " +
+          "blacklist=" + blacklist + "]");
       return;
     }
     int numAllocated = 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
index 725e2d9..a2f9f4d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
@@ -18,12 +18,7 @@
 
 package org.apache.hadoop.yarn.server.scheduler;
 
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
@@ -52,9 +47,6 @@ public class OpportunisticContainerContext {
   private static final Logger LOG = LoggerFactory
       .getLogger(OpportunisticContainerContext.class);
 
-  // Currently just used to keep track of allocated containers.
-  // Can be used for reporting stats later.
-  private Set<ContainerId> containersAllocated = new HashSet<>();
   private AllocationParams appParams =
       new AllocationParams();
   private ContainerIdGenerator containerIdGenerator =
@@ -69,13 +61,9 @@ public class OpportunisticContainerContext {
   // Resource Name (host/rack/any) and capability. This mapping is required
   // to match a received Container to an outstanding OPPORTUNISTIC
   // ResourceRequest (ask).
-  private final TreeMap<Priority, Map<Resource, ResourceRequest>>
+  private final TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>>
       outstandingOpReqs = new TreeMap<>();
 
-  public Set<ContainerId> getContainersAllocated() {
-    return containersAllocated;
-  }
-
   public AllocationParams getAppParams() {
     return appParams;
   }
@@ -119,20 +107,11 @@ public class OpportunisticContainerContext {
     return blacklist;
   }
 
-  public TreeMap<Priority, Map<Resource, ResourceRequest>>
+  public TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>>
       getOutstandingOpReqs() {
     return outstandingOpReqs;
   }
 
-  public void updateCompletedContainers(AllocateResponse allocateResponse) {
-    for (ContainerStatus cs :
-        allocateResponse.getCompletedContainersStatuses()) {
-      if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
-        containersAllocated.remove(cs.getContainerId());
-      }
-    }
-  }
-
   /**
    * Takes a list of ResourceRequests (asks), extracts the key information viz.
    * (Priority, ResourceName, Capability) and adds to the outstanding
@@ -144,7 +123,7 @@ public class OpportunisticContainerContext {
    */
   public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
     for (ResourceRequest request : resourceAsks) {
-      Priority priority = request.getPriority();
+      SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
 
       // TODO: Extend for Node/Rack locality. We only handle ANY requests now
       if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
@@ -156,10 +135,10 @@ public class OpportunisticContainerContext {
       }
 
       Map<Resource, ResourceRequest> reqMap =
-          outstandingOpReqs.get(priority);
+          outstandingOpReqs.get(schedulerKey);
       if (reqMap == null) {
         reqMap = new HashMap<>();
-        outstandingOpReqs.put(priority, reqMap);
+        outstandingOpReqs.put(schedulerKey, reqMap);
       }
 
       ResourceRequest resourceRequest = reqMap.get(request.getCapability());
@@ -171,7 +150,8 @@ public class OpportunisticContainerContext {
             resourceRequest.getNumContainers() + request.getNumContainers());
       }
       if (ResourceRequest.isAnyLocation(request.getResourceName())) {
-        LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority
+        LOG.info("# of outstandingOpReqs in ANY (at" +
+            "priority = "+ schedulerKey.getPriority()
             + ", with capability = " + request.getCapability() + " ) : "
             + resourceRequest.getNumContainers());
       }
@@ -187,9 +167,9 @@ public class OpportunisticContainerContext {
   public void matchAllocationToOutstandingRequest(Resource capability,
       List<Container> allocatedContainers) {
     for (Container c : allocatedContainers) {
-      containersAllocated.add(c.getId());
+      SchedulerRequestKey schedulerKey = SchedulerRequestKey.extractFrom(c);
       Map<Resource, ResourceRequest> asks =
-          outstandingOpReqs.get(c.getPriority());
+          outstandingOpReqs.get(schedulerKey);
 
       if (asks == null) {
         continue;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
new file mode 100644
index 0000000..9b7edbe
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+
+/**
+ * Composite key for outstanding scheduler requests for any schedulable entity.
+ * Currently it includes {@link Priority}.
+ */
+public final class SchedulerRequestKey implements
+    Comparable<SchedulerRequestKey> {
+
+  private final Priority priority;
+  private final long allocationRequestId;
+
+  /**
+   * Factory method to generate a SchedulerRequestKey from a ResourceRequest.
+   * @param req ResourceRequest
+   * @return SchedulerRequestKey
+   */
+  public static SchedulerRequestKey create(ResourceRequest req) {
+    return new SchedulerRequestKey(req.getPriority(),
+        req.getAllocationRequestId());
+  }
+
+  /**
+   * Convenience method to extract the SchedulerRequestKey used to schedule the
+   * Container.
+   * @param container Container
+   * @return SchedulerRequestKey
+   */
+  public static SchedulerRequestKey extractFrom(Container container) {
+    return new SchedulerRequestKey(container.getPriority(),
+        container.getAllocationRequestId());
+  }
+
+  SchedulerRequestKey(Priority priority, long allocationRequestId) {
+    this.priority = priority;
+    this.allocationRequestId = allocationRequestId;
+  }
+
+  /**
+   * Get the {@link Priority} of the request.
+   *
+   * @return the {@link Priority} of the request
+   */
+  public Priority getPriority() {
+    return priority;
+  }
+
+  /**
+   * Get the Id of the associated {@link ResourceRequest}.
+   *
+   * @return the Id of the associated {@link ResourceRequest}
+   */
+  public long getAllocationRequestId() {
+    return allocationRequestId;
+  }
+
+  @Override
+  public int compareTo(SchedulerRequestKey o) {
+    if (o == null) {
+      return (priority != null) ? -1 : 0;
+    } else {
+      if (priority == null) {
+        return 1;
+      }
+    }
+    int priorityCompare = o.getPriority().compareTo(priority);
+    // we first sort by priority and then by allocationRequestId
+    if (priorityCompare != 0) {
+      return priorityCompare;
+    }
+    return Long.compare(allocationRequestId, o.getAllocationRequestId());
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof SchedulerRequestKey)) {
+      return false;
+    }
+
+    SchedulerRequestKey that = (SchedulerRequestKey) o;
+
+    if (getAllocationRequestId() != that.getAllocationRequestId()) {
+      return false;
+    }
+    return getPriority() != null ?
+        getPriority().equals(that.getPriority()) :
+        that.getPriority() == null;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = getPriority() != null ? getPriority().hashCode() : 0;
+    result = 31 * result + (int) (getAllocationRequestId() ^ (
+        getAllocationRequestId() >>> 32));
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "SchedulerRequestKey{" +
+        "priority=" + priority +
+        ", allocationRequestId=" + allocationRequestId +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
index 0f47c93..a9b5ed4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
@@ -227,10 +227,10 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
         .partitionAskList(request.getAllocateRequest().getAskList());
 
     // Allocate OPPORTUNISTIC containers.
-    request.getAllocateRequest().setAskList(partitionedAsks.getOpportunistic());
     List<Container> allocatedContainers =
         containerAllocator.allocateContainers(
-            request.getAllocateRequest(), applicationAttemptId,
+            request.getAllocateRequest().getResourceBlacklistRequest(),
+            partitionedAsks.getOpportunistic(), applicationAttemptId,
             oppContainerContext, rmIdentifier, appSubmitter);
 
     // Prepare request for sending to RM for scheduling GUARANTEED containers.
@@ -252,18 +252,11 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
       nodeTokens.put(nmToken.getNodeId(), nmToken);
     }
 
-    oppContainerContext.updateCompletedContainers(dsResp.getAllocateResponse());
-
     // Check if we have NM tokens for all the allocated containers. If not
     // generate one and update the response.
     updateAllocateResponse(
         dsResp.getAllocateResponse(), nmTokens, allocatedContainers);
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Number of opportunistic containers currently" +
-          "allocated by application: " + oppContainerContext
-          .getContainersAllocated().size());
-    }
     return dsResp;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index b451f68..c2b0012 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -400,7 +400,6 @@ public class ApplicationMasterService extends AbstractService implements
 
     ApplicationAttemptId appAttemptId =
         amrmTokenIdentifier.getApplicationAttemptId();
-    ApplicationId applicationId = appAttemptId.getApplicationId();
 
     this.amLivelinessMonitor.receivedPing(appAttemptId);
 
@@ -417,8 +416,10 @@ public class ApplicationMasterService extends AbstractService implements
       AllocateResponse lastResponse = lock.getAllocateResponse();
       if (!hasApplicationMasterRegistered(appAttemptId)) {
         String message =
-            "AM is not registered for known application attempt: " + appAttemptId
-                + " or RM had restarted after AM registered . AM should re-register.";
+            "AM is not registered for known application attempt: "
+                + appAttemptId
+                + " or RM had restarted after AM registered. "
+                + " AM should re-register.";
         throw new ApplicationMasterNotRegisteredException(message);
       }
 
@@ -433,179 +434,10 @@ public class ApplicationMasterService extends AbstractService implements
         throw new InvalidApplicationMasterRequestException(message);
       }
 
-      //filter illegal progress values
-      float filteredProgress = request.getProgress();
-      if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY
-        || filteredProgress < 0) {
-         request.setProgress(0);
-      } else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) {
-        request.setProgress(1);
-      }
-
-      // Send the status update to the appAttempt.
-      this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptStatusupdateEvent(appAttemptId, request
-              .getProgress()));
-
-      List<ResourceRequest> ask = request.getAskList();
-      List<ContainerId> release = request.getReleaseList();
-
-      ResourceBlacklistRequest blacklistRequest =
-          request.getResourceBlacklistRequest();
-      List<String> blacklistAdditions =
-          (blacklistRequest != null) ?
-              blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
-      List<String> blacklistRemovals =
-          (blacklistRequest != null) ?
-              blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
-      RMApp app =
-          this.rmContext.getRMApps().get(applicationId);
-      
-      // set label expression for Resource Requests if resourceName=ANY 
-      ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
-      for (ResourceRequest req : ask) {
-        if (null == req.getNodeLabelExpression()
-            && ResourceRequest.ANY.equals(req.getResourceName())) {
-          req.setNodeLabelExpression(asc.getNodeLabelExpression());
-        }
-      }
-      
-      Resource maximumCapacity = rScheduler.getMaximumResourceCapability();
-              
-      // sanity check
-      try {
-        RMServerUtils.normalizeAndValidateRequests(ask,
-            maximumCapacity, app.getQueue(),
-            rScheduler, rmContext);
-      } catch (InvalidResourceRequestException e) {
-        LOG.warn("Invalid resource ask by application " + appAttemptId, e);
-        throw e;
-      }
-      
-      try {
-        RMServerUtils.validateBlacklistRequest(blacklistRequest);
-      }  catch (InvalidResourceBlacklistRequestException e) {
-        LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
-        throw e;
-      }
-
-      // In the case of work-preserving AM restart, it's possible for the
-      // AM to release containers from the earlier attempt.
-      if (!app.getApplicationSubmissionContext()
-        .getKeepContainersAcrossApplicationAttempts()) {
-        try {
-          RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
-        } catch (InvalidContainerReleaseException e) {
-          LOG.warn("Invalid container release by application " + appAttemptId,
-              e);
-          throw e;
-        }
-      }
-
-      // Split Update Resource Requests into increase and decrease.
-      // No Exceptions are thrown here. All update errors are aggregated
-      // and returned to the AM.
-      List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>();
-      List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>();
-      List<UpdateContainerError> updateContainerErrors =
-          RMServerUtils.validateAndSplitUpdateResourceRequests(rmContext,
-              request, maximumCapacity, increaseResourceReqs,
-              decreaseResourceReqs);
-
-      // Send new requests to appAttempt.
-      Allocation allocation;
-      RMAppAttemptState state =
-          app.getRMAppAttempt(appAttemptId).getAppAttemptState();
-      if (state.equals(RMAppAttemptState.FINAL_SAVING) ||
-          state.equals(RMAppAttemptState.FINISHING) ||
-          app.isAppFinalStateStored()) {
-        LOG.warn(appAttemptId + " is in " + state +
-                 " state, ignore container allocate request.");
-        allocation = EMPTY_ALLOCATION;
-      } else {
-        allocation =
-            this.rScheduler.allocate(appAttemptId, ask, release,
-                blacklistAdditions, blacklistRemovals,
-                increaseResourceReqs, decreaseResourceReqs);
-      }
-
-      if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
-        LOG.info("blacklist are updated in Scheduler." +
-            "blacklistAdditions: " + blacklistAdditions + ", " +
-            "blacklistRemovals: " + blacklistRemovals);
-      }
-      RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
-      AllocateResponse allocateResponse =
+      AllocateResponse response =
           recordFactory.newRecordInstance(AllocateResponse.class);
-      if (allocation.getNMTokens() != null &&
-          !allocation.getNMTokens().isEmpty()) {
-        allocateResponse.setNMTokens(allocation.getNMTokens());
-      }
-
-      // Notify the AM of container update errors
-      if (!updateContainerErrors.isEmpty()) {
-        allocateResponse.setUpdateErrors(updateContainerErrors);
-      }
-      // update the response with the deltas of node status changes
-      List<RMNode> updatedNodes = new ArrayList<RMNode>();
-      if(app.pullRMNodeUpdates(updatedNodes) > 0) {
-        List<NodeReport> updatedNodeReports = new ArrayList<NodeReport>();
-        for(RMNode rmNode: updatedNodes) {
-          SchedulerNodeReport schedulerNodeReport =  
-              rScheduler.getNodeReport(rmNode.getNodeID());
-          Resource used = BuilderUtils.newResource(0, 0);
-          int numContainers = 0;
-          if (schedulerNodeReport != null) {
-            used = schedulerNodeReport.getUsedResource();
-            numContainers = schedulerNodeReport.getNumContainers();
-          }
-          NodeId nodeId = rmNode.getNodeID();
-          NodeReport report =
-              BuilderUtils.newNodeReport(nodeId, rmNode.getState(),
-                  rmNode.getHttpAddress(), rmNode.getRackName(), used,
-                  rmNode.getTotalCapability(), numContainers,
-                  rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
-                  rmNode.getNodeLabels());
-
-          updatedNodeReports.add(report);
-        }
-        allocateResponse.setUpdatedNodes(updatedNodeReports);
-      }
-
-      allocateResponse.setAllocatedContainers(allocation.getContainers());
-      allocateResponse.setCompletedContainersStatuses(appAttempt
-          .pullJustFinishedContainers());
-      allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
-      allocateResponse.setAvailableResources(allocation.getResourceLimit());
-      
-      // Handling increased/decreased containers
-      List<UpdatedContainer> updatedContainers = new ArrayList<>();
-      if (allocation.getIncreasedContainers() != null) {
-        for (Container c : allocation.getIncreasedContainers()) {
-          updatedContainers.add(
-              UpdatedContainer.newInstance(
-                  ContainerUpdateType.INCREASE_RESOURCE, c));
-        }
-      }
-      if (allocation.getDecreasedContainers() != null) {
-        for (Container c : allocation.getDecreasedContainers()) {
-          updatedContainers.add(
-              UpdatedContainer.newInstance(
-                  ContainerUpdateType.DECREASE_RESOURCE, c));
-        }
-      }
-
-      allocateResponse.setUpdatedContainers(updatedContainers);
-
-      allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
-
-      // add preemption to the allocateResponse message (if any)
-      allocateResponse
-          .setPreemptionMessage(generatePreemptionMessage(allocation));
-
-      // Set application priority
-      allocateResponse.setApplicationPriority(app
-          .getApplicationPriority());
+      allocateInternal(amrmTokenIdentifier.getApplicationAttemptId(),
+          request, response);
 
       // update AMRMToken if the token is rolled-up
       MasterKeyData nextMasterKey =
@@ -613,21 +445,24 @@ public class ApplicationMasterService extends AbstractService implements
 
       if (nextMasterKey != null
           && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
-            .getKeyId()) {
+          .getKeyId()) {
+        RMApp app =
+            this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
+        RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
         RMAppAttemptImpl appAttemptImpl = (RMAppAttemptImpl)appAttempt;
         Token<AMRMTokenIdentifier> amrmToken = appAttempt.getAMRMToken();
         if (nextMasterKey.getMasterKey().getKeyId() !=
             appAttemptImpl.getAMRMTokenKeyId()) {
           LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back"
-              + " to application: " + applicationId);
+              + " to application: " + appAttemptId.getApplicationId());
           amrmToken = rmContext.getAMRMTokenSecretManager()
               .createAndGetAMRMToken(appAttemptId);
           appAttemptImpl.setAMRMToken(amrmToken);
         }
-        allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
-          .newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
-            .toString(), amrmToken.getPassword(), amrmToken.getService()
-            .toString()));
+        response.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
+            .newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
+                .toString(), amrmToken.getPassword(), amrmToken.getService()
+                .toString()));
       }
 
       /*
@@ -635,11 +470,220 @@ public class ApplicationMasterService extends AbstractService implements
        * need to worry about unregister call occurring in between (which
        * removes the lock object).
        */
-      lock.setAllocateResponse(allocateResponse);
-      return allocateResponse;
+      response.setResponseId(lastResponse.getResponseId() + 1);
+      lock.setAllocateResponse(response);
+      return response;
     }    
   }
 
+  protected void allocateInternal(ApplicationAttemptId appAttemptId,
+      AllocateRequest request, AllocateResponse allocateResponse)
+      throws YarnException {
+
+    //filter illegal progress values
+    float filteredProgress = request.getProgress();
+    if (Float.isNaN(filteredProgress) ||
+        filteredProgress == Float.NEGATIVE_INFINITY ||
+        filteredProgress < 0) {
+      request.setProgress(0);
+    } else if (filteredProgress > 1 ||
+        filteredProgress == Float.POSITIVE_INFINITY) {
+      request.setProgress(1);
+    }
+
+    // Send the status update to the appAttempt.
+    this.rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptStatusupdateEvent(appAttemptId, request
+            .getProgress()));
+
+    List<ResourceRequest> ask = request.getAskList();
+    List<ContainerId> release = request.getReleaseList();
+
+    ResourceBlacklistRequest blacklistRequest =
+        request.getResourceBlacklistRequest();
+    List<String> blacklistAdditions =
+        (blacklistRequest != null) ?
+            blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
+    List<String> blacklistRemovals =
+        (blacklistRequest != null) ?
+            blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
+    RMApp app =
+        this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
+
+    // set label expression for Resource Requests if resourceName=ANY
+    ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
+    for (ResourceRequest req : ask) {
+      if (null == req.getNodeLabelExpression()
+          && ResourceRequest.ANY.equals(req.getResourceName())) {
+        req.setNodeLabelExpression(asc.getNodeLabelExpression());
+      }
+    }
+
+    Resource maximumCapacity = rScheduler.getMaximumResourceCapability();
+
+    // sanity check
+    try {
+      RMServerUtils.normalizeAndValidateRequests(ask,
+          maximumCapacity, app.getQueue(),
+          rScheduler, rmContext);
+    } catch (InvalidResourceRequestException e) {
+      LOG.warn("Invalid resource ask by application " + appAttemptId, e);
+      throw e;
+    }
+
+    try {
+      RMServerUtils.validateBlacklistRequest(blacklistRequest);
+    }  catch (InvalidResourceBlacklistRequestException e) {
+      LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
+      throw e;
+    }
+
+    // In the case of work-preserving AM restart, it's possible for the
+    // AM to release containers from the earlier attempt.
+    if (!app.getApplicationSubmissionContext()
+        .getKeepContainersAcrossApplicationAttempts()) {
+      try {
+        RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
+      } catch (InvalidContainerReleaseException e) {
+        LOG.warn("Invalid container release by application " + appAttemptId,
+            e);
+        throw e;
+      }
+    }
+
+    // Split Update Resource Requests into increase and decrease.
+    // No Exceptions are thrown here. All update errors are aggregated
+    // and returned to the AM.
+    List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>();
+    List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>();
+    List<UpdateContainerError> updateContainerErrors =
+        RMServerUtils.validateAndSplitUpdateResourceRequests(
+            rmContext, request, maximumCapacity,
+            increaseResourceReqs, decreaseResourceReqs);
+
+    // Send new requests to appAttempt.
+    Allocation allocation;
+    RMAppAttemptState state =
+        app.getRMAppAttempt(appAttemptId).getAppAttemptState();
+    if (state.equals(RMAppAttemptState.FINAL_SAVING) ||
+        state.equals(RMAppAttemptState.FINISHING) ||
+        app.isAppFinalStateStored()) {
+      LOG.warn(appAttemptId + " is in " + state +
+               " state, ignore container allocate request.");
+      allocation = EMPTY_ALLOCATION;
+    } else {
+      allocation =
+          this.rScheduler.allocate(appAttemptId, ask, release,
+              blacklistAdditions, blacklistRemovals,
+              increaseResourceReqs, decreaseResourceReqs);
+    }
+
+    if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
+      LOG.info("blacklist are updated in Scheduler." +
+          "blacklistAdditions: " + blacklistAdditions + ", " +
+          "blacklistRemovals: " + blacklistRemovals);
+    }
+    RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
+
+    if (allocation.getNMTokens() != null &&
+        !allocation.getNMTokens().isEmpty()) {
+      allocateResponse.setNMTokens(allocation.getNMTokens());
+    }
+
+    // Notify the AM of container update errors
+    addToUpdateContainerErrors(allocateResponse, updateContainerErrors);
+
+    // update the response with the deltas of node status changes
+    List<RMNode> updatedNodes = new ArrayList<RMNode>();
+    if(app.pullRMNodeUpdates(updatedNodes) > 0) {
+      List<NodeReport> updatedNodeReports = new ArrayList<NodeReport>();
+      for(RMNode rmNode: updatedNodes) {
+        SchedulerNodeReport schedulerNodeReport =
+            rScheduler.getNodeReport(rmNode.getNodeID());
+        Resource used = BuilderUtils.newResource(0, 0);
+        int numContainers = 0;
+        if (schedulerNodeReport != null) {
+          used = schedulerNodeReport.getUsedResource();
+          numContainers = schedulerNodeReport.getNumContainers();
+        }
+        NodeId nodeId = rmNode.getNodeID();
+        NodeReport report =
+            BuilderUtils.newNodeReport(nodeId, rmNode.getState(),
+                rmNode.getHttpAddress(), rmNode.getRackName(), used,
+                rmNode.getTotalCapability(), numContainers,
+                rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
+                rmNode.getNodeLabels());
+
+        updatedNodeReports.add(report);
+      }
+      allocateResponse.setUpdatedNodes(updatedNodeReports);
+    }
+
+    addToAllocatedContainers(allocateResponse, allocation.getContainers());
+
+    allocateResponse.setCompletedContainersStatuses(appAttempt
+        .pullJustFinishedContainers());
+    allocateResponse.setAvailableResources(allocation.getResourceLimit());
+
+    // Handling increased containers
+    addToUpdatedContainers(
+        allocateResponse, ContainerUpdateType.INCREASE_RESOURCE,
+        allocation.getIncreasedContainers());
+
+    // Handling decreased containers
+    addToUpdatedContainers(
+        allocateResponse, ContainerUpdateType.DECREASE_RESOURCE,
+        allocation.getDecreasedContainers());
+
+    allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
+
+    // add preemption to the allocateResponse message (if any)
+    allocateResponse
+        .setPreemptionMessage(generatePreemptionMessage(allocation));
+
+    // Set application priority
+    allocateResponse.setApplicationPriority(app
+        .getApplicationPriority());
+  }
+
+  protected void addToUpdateContainerErrors(AllocateResponse allocateResponse,
+      List<UpdateContainerError> updateContainerErrors) {
+    if (!updateContainerErrors.isEmpty()) {
+      if (allocateResponse.getUpdateErrors() != null
+          && !allocateResponse.getUpdateErrors().isEmpty()) {
+        updateContainerErrors = new ArrayList<>(updateContainerErrors);
+        updateContainerErrors.addAll(allocateResponse.getUpdateErrors());
+      }
+      allocateResponse.setUpdateErrors(updateContainerErrors);
+    }
+  }
+
+  protected void addToUpdatedContainers(AllocateResponse allocateResponse,
+      ContainerUpdateType updateType, List<Container> updatedContainers) {
+    if (updatedContainers != null && updatedContainers.size() > 0) {
+      ArrayList<UpdatedContainer> containersToSet = new ArrayList<>();
+      if (allocateResponse.getUpdatedContainers() != null &&
+          !allocateResponse.getUpdatedContainers().isEmpty()) {
+        containersToSet.addAll(allocateResponse.getUpdatedContainers());
+      }
+      for (Container updatedContainer : updatedContainers) {
+        containersToSet.add(
+            UpdatedContainer.newInstance(updateType, updatedContainer));
+      }
+      allocateResponse.setUpdatedContainers(containersToSet);
+    }
+  }
+
+  protected void addToAllocatedContainers(AllocateResponse allocateResponse,
+      List<Container> allocatedContainers) {
+    if (allocateResponse.getAllocatedContainers() != null
+        && !allocateResponse.getAllocatedContainers().isEmpty()) {
+      allocatedContainers = new ArrayList<>(allocatedContainers);
+      allocatedContainers.addAll(allocateResponse.getAllocatedContainers());
+    }
+    allocateResponse.setAllocatedContainers(allocatedContainers);
+  }
+
   private PreemptionMessage generatePreemptionMessage(Allocation allocation){
     PreemptionMessage pMsg = null;
     // assemble strict preemption request

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index 10e5275..708b481 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -101,8 +101,8 @@ public class OpportunisticContainerAllocatorAMService
   private final int k;
 
   private final long cacheRefreshInterval;
-  private List<RemoteNode> cachedNodes;
-  private long lastCacheUpdateTime;
+  private volatile List<RemoteNode> cachedNodes;
+  private volatile long lastCacheUpdateTime;
 
   public OpportunisticContainerAllocatorAMService(RMContext rmContext,
       YarnScheduler scheduler) {
@@ -218,8 +218,9 @@ public class OpportunisticContainerAllocatorAMService
   }
 
   @Override
-  public AllocateResponse allocate(AllocateRequest request) throws
-      YarnException, IOException {
+  protected void allocateInternal(ApplicationAttemptId appAttemptId,
+      AllocateRequest request, AllocateResponse allocateResponse)
+      throws YarnException {
 
     // Partition requests to GUARANTEED and OPPORTUNISTIC.
     OpportunisticContainerAllocator.PartitionedResourceRequests
@@ -227,40 +228,30 @@ public class OpportunisticContainerAllocatorAMService
         oppContainerAllocator.partitionAskList(request.getAskList());
 
     // Allocate OPPORTUNISTIC containers.
-    request.setAskList(partitionedAsks.getOpportunistic());
-    final ApplicationAttemptId appAttemptId = getAppAttemptId();
-    SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
-        rmContext.getScheduler()).getApplicationAttempt(appAttemptId);
+    SchedulerApplicationAttempt appAttempt =
+        ((AbstractYarnScheduler)rmContext.getScheduler())
+            .getApplicationAttempt(appAttemptId);
 
     OpportunisticContainerContext oppCtx =
         appAttempt.getOpportunisticContainerContext();
     oppCtx.updateNodeList(getLeastLoadedNodes());
 
     List<Container> oppContainers =
-        oppContainerAllocator.allocateContainers(request, appAttemptId, oppCtx,
-        ResourceManager.getClusterTimeStamp(), appAttempt.getUser());
+        oppContainerAllocator.allocateContainers(
+            request.getResourceBlacklistRequest(),
+            partitionedAsks.getOpportunistic(), appAttemptId, oppCtx,
+            ResourceManager.getClusterTimeStamp(), appAttempt.getUser());
 
     // Create RMContainers and update the NMTokens.
     if (!oppContainers.isEmpty()) {
       handleNewContainers(oppContainers, false);
       appAttempt.updateNMTokens(oppContainers);
+      addToAllocatedContainers(allocateResponse, oppContainers);
     }
 
     // Allocate GUARANTEED containers.
     request.setAskList(partitionedAsks.getGuaranteed());
-    AllocateResponse allocateResp = super.allocate(request);
-
-    // Add allocated OPPORTUNISTIC containers to the AllocateResponse.
-    if (!oppContainers.isEmpty()) {
-      allocateResp.getAllocatedContainers().addAll(oppContainers);
-    }
-
-    // Update opportunistic container context with the allocated GUARANTEED
-    // containers.
-    oppCtx.updateCompletedContainers(allocateResp);
-
-    // Add all opportunistic containers
-    return allocateResp;
+    super.allocateInternal(appAttemptId, request, allocateResponse);
   }
 
   @Override
@@ -304,7 +295,7 @@ public class OpportunisticContainerAllocatorAMService
   }
 
   private void handleNewContainers(List<Container> allocContainers,
-                                   boolean isRemotelyAllocated) {
+      boolean isRemotelyAllocated) {
     for (Container container : allocContainers) {
       // Create RMContainer
       SchedulerApplicationAttempt appAttempt =
@@ -387,10 +378,12 @@ public class OpportunisticContainerAllocatorAMService
   private synchronized List<RemoteNode> getLeastLoadedNodes() {
     long currTime = System.currentTimeMillis();
     if ((currTime - lastCacheUpdateTime > cacheRefreshInterval)
-        || cachedNodes == null) {
+        || (cachedNodes == null)) {
       cachedNodes = convertToRemoteNodes(
           this.nodeMonitor.selectLeastLoadedNodes(this.k));
-      lastCacheUpdateTime = currTime;
+      if (cachedNodes.size() > 0) {
+        lastCacheUpdateTime = currTime;
+      }
     }
     return cachedNodes;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index a244ad8..020764b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -25,14 +25,13 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index e46cd3b..0afd765 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -53,7 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
     .RMNodeDecreaseContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java
index d7d1e94..80e7c0b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 
 /**
  * The event signifying that a container has been reserved.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index b9deb6c..13ea465 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalitySchedulingPlacementSet;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
+
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index bc52816..6a5b090 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -73,12 +72,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
 
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 6744c2e..759db05 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.collect.ImmutableSet;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java
deleted file mode 100644
index 4b640ae..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-
-/**
- * Composite key for outstanding scheduler requests for any schedulable entity.
- * Currently it includes {@link Priority}.
- */
-public final class SchedulerRequestKey implements
-    Comparable<SchedulerRequestKey> {
-
-  private final Priority priority;
-  private final long allocationRequestId;
-
-  /**
-   * Factory method to generate a SchedulerRequestKey from a ResourceRequest.
-   * @param req ResourceRequest
-   * @return SchedulerRequestKey
-   */
-  public static SchedulerRequestKey create(ResourceRequest req) {
-    return new SchedulerRequestKey(req.getPriority(),
-        req.getAllocationRequestId());
-  }
-
-  /**
-   * Convenience method to extract the SchedulerRequestKey used to schedule the
-   * Container.
-   * @param container Container
-   * @return SchedulerRequestKey
-   */
-  public static SchedulerRequestKey extractFrom(Container container) {
-    return new SchedulerRequestKey(container.getPriority(),
-        container.getAllocationRequestId());
-  }
-
-  private SchedulerRequestKey(Priority priority, long allocationRequestId) {
-    this.priority = priority;
-    this.allocationRequestId = allocationRequestId;
-  }
-
-  /**
-   * Get the {@link Priority} of the request.
-   *
-   * @return the {@link Priority} of the request
-   */
-  public Priority getPriority() {
-    return priority;
-  }
-
-  /**
-   * Get the Id of the associated {@link ResourceRequest}.
-   *
-   * @return the Id of the associated {@link ResourceRequest}
-   */
-  public long getAllocationRequestId() {
-    return allocationRequestId;
-  }
-
-  @Override
-  public int compareTo(SchedulerRequestKey o) {
-    if (o == null) {
-      return (priority != null) ? -1 : 0;
-    } else {
-      if (priority == null) {
-        return 1;
-      }
-    }
-    int priorityCompare = o.getPriority().compareTo(priority);
-    // we first sort by priority and then by allocationRequestId
-    if (priorityCompare != 0) {
-      return priorityCompare;
-    }
-    return Long.compare(allocationRequestId, o.getAllocationRequestId());
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (!(o instanceof SchedulerRequestKey)) {
-      return false;
-    }
-
-    SchedulerRequestKey that = (SchedulerRequestKey) o;
-
-    if (getAllocationRequestId() != that.getAllocationRequestId()) {
-      return false;
-    }
-    return getPriority() != null ?
-        getPriority().equals(that.getPriority()) :
-        that.getPriority() == null;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = getPriority() != null ? getPriority().hashCode() : 0;
-    result = 31 * result + (int) (getAllocationRequestId() ^ (
-        getAllocationRequestId() >>> 32));
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 5e4e441..ace0b75 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -2216,7 +2217,8 @@ public class LeafQueue extends AbstractCSQueue {
   @Override
   public void attachContainer(Resource clusterResource,
       FiCaSchedulerApp application, RMContainer rmContainer) {
-    if (application != null) {
+    if (application != null && rmContainer != null
+        && rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       allocateResource(clusterResource, application, rmContainer.getContainer()
@@ -2234,7 +2236,8 @@ public class LeafQueue extends AbstractCSQueue {
   @Override
   public void detachContainer(Resource clusterResource,
       FiCaSchedulerApp application, RMContainer rmContainer) {
-    if (application != null) {
+    if (application != null && rmContainer != null
+          && rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       releaseResource(clusterResource, application, rmContainer.getContainer()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
index 74a64c1..0dc527f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index f5026ed..1eb48bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java
index 8b4907b..159fb09 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 
 /**
  * Contexts for a container inside scheduler

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index f076e4f..1022be7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
@@ -69,6 +68,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
+
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
index d79fcaf..344daf2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
index dec55ca..fb67270 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -61,9 +60,9 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
     @Override
     public int compare(ClusterNode o1, ClusterNode o2) {
       if (getMetric(o1) == getMetric(o2)) {
-        return o1.timestamp < o2.timestamp ? +1 : -1;
+        return (int)(o2.timestamp - o1.timestamp);
       }
-      return getMetric(o1) > getMetric(o2) ? +1 : -1;
+      return getMetric(o1) - getMetric(o2);
     }
 
     public int getMetric(ClusterNode c) {
@@ -115,8 +114,13 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
       ReentrantReadWriteLock.WriteLock writeLock = sortedNodesLock.writeLock();
       writeLock.lock();
       try {
-        sortedNodes.clear();
-        sortedNodes.addAll(sortNodes());
+        try {
+          List<NodeId> nodeIds = sortNodes();
+          sortedNodes.clear();
+          sortedNodes.addAll(nodeIds);
+        } catch (Exception ex) {
+          LOG.warn("Got Exception while sorting nodes..", ex);
+        }
         if (thresholdCalculator != null) {
           thresholdCalculator.update();
         }
@@ -273,7 +277,7 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
       List<NodeId> retVal = ((k < this.sortedNodes.size()) && (k >= 0)) ?
           new ArrayList<>(this.sortedNodes).subList(0, k) :
           new ArrayList<>(this.sortedNodes);
-      return Collections.unmodifiableList(retVal);
+      return retVal;
     } finally {
       readLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 0686bc2..94030e4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81da7d1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index 024ec67..85aab9b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 
 @Private


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message