hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dran...@apache.org
Subject [10/50] [abbrv] hadoop git commit: YARN-5392. Replace use of Priority in the Scheduling infrastructure with an opaque ShedulerRequestKey. (asuresh and subru)
Date Mon, 01 Aug 2016 15:55:01 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index aae5292..4bae5be 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -32,6 +31,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+
+
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
@@ -80,7 +83,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   
   private ContainerAllocation preCheckForNewContainer(Resource clusterResource,
       FiCaSchedulerNode node, SchedulingMode schedulingMode,
-      ResourceLimits resourceLimits, Priority priority) {
+      ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) {
     if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) {
       application.updateAppSkipNodeDiagnostics(
           CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE);
@@ -88,7 +91,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     }
 
     ResourceRequest anyRequest =
-        application.getResourceRequest(priority, ResourceRequest.ANY);
+        application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
     if (null == anyRequest) {
       return ContainerAllocation.PRIORITY_SKIPPED;
     }
@@ -97,7 +100,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     Resource required = anyRequest.getCapability();
 
     // Do we need containers at this 'priority'?
-    if (application.getTotalRequiredResources(priority) <= 0) {
+    if (application.getTotalRequiredResources(schedulerKey) <= 0) {
       return ContainerAllocation.PRIORITY_SKIPPED;
     }
 
@@ -126,7 +129,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     }
 
     if (!application.getCSLeafQueue().getReservationContinueLooking()) {
-      if (!shouldAllocOrReserveNewContainer(priority, required)) {
+      if (!shouldAllocOrReserveNewContainer(schedulerKey, required)) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("doesn't need containers based on reservation algo!");
         }
@@ -143,7 +146,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     }
 
     // Inform the application it is about to get a scheduling opportunity
-    application.addSchedulingOpportunity(priority);
+    application.addSchedulingOpportunity(schedulerKey);
 
     // Increase missed-non-partitioned-resource-request-opportunity.
     // This is to make sure non-partitioned-resource-request will prefer
@@ -152,8 +155,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     if (anyRequest.getNodeLabelExpression()
         .equals(RMNodeLabelsManager.NO_LABEL)) {
       missedNonPartitionedRequestSchedulingOpportunity =
-          application
-              .addMissedNonPartitionedRequestSchedulingOpportunity(priority);
+          application.addMissedNonPartitionedRequestSchedulingOpportunity(
+              schedulerKey);
     }
 
     if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
@@ -164,7 +167,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
           .getScheduler().getNumClusterNodes()) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
-              + " priority=" + priority
+              + " priority=" + schedulerKey.getPriority()
               + " because missed-non-partitioned-resource-request"
               + " opportunity under requred:" + " Now="
               + missedNonPartitionedRequestSchedulingOpportunity + " required="
@@ -180,20 +183,20 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
   ContainerAllocation preAllocation(Resource clusterResource,
       FiCaSchedulerNode node, SchedulingMode schedulingMode,
-      ResourceLimits resourceLimits, Priority priority,
+      ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey,
       RMContainer reservedContainer) {
     ContainerAllocation result;
     if (null == reservedContainer) {
       // pre-check when allocating new container
       result =
           preCheckForNewContainer(clusterResource, node, schedulingMode,
-              resourceLimits, priority);
+              resourceLimits, schedulerKey);
       if (null != result) {
         return result;
       }
     } else {
       // pre-check when allocating reserved container
-      if (application.getTotalRequiredResources(priority) == 0) {
+      if (application.getTotalRequiredResources(schedulerKey) == 0) {
         // Release
         return new ContainerAllocation(reservedContainer, null,
             AllocationState.QUEUE_SKIPPED);
@@ -202,13 +205,13 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
     // Try to allocate containers on node
     result =
-        assignContainersOnNode(clusterResource, node, priority,
+        assignContainersOnNode(clusterResource, node, schedulerKey,
             reservedContainer, schedulingMode, resourceLimits);
     
     if (null == reservedContainer) {
       if (result.state == AllocationState.PRIORITY_SKIPPED) {
         // Don't count 'skipped nodes' as a scheduling opportunity!
-        application.subtractSchedulingOpportunity(priority);
+        application.subtractSchedulingOpportunity(schedulerKey);
       }
     }
     
@@ -216,10 +219,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   }
   
   public synchronized float getLocalityWaitFactor(
-      Priority priority, int clusterNodes) {
+      SchedulerRequestKey schedulerKey, int clusterNodes) {
     // Estimate: Required unique resources (i.e. hosts + racks)
     int requiredResources = 
-        Math.max(application.getResourceRequests(priority).size() - 1, 0);
+        Math.max(application.getResourceRequests(schedulerKey).size() - 1, 0);
     
     // waitFactor can't be more than '1' 
     // i.e. no point skipping more than clustersize opportunities
@@ -231,8 +234,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
         .getCSLeafQueue().getNodeLocalityDelay());
   }
 
-  private boolean canAssign(Priority priority, FiCaSchedulerNode node,
-      NodeType type, RMContainer reservedContainer) {
+  private boolean canAssign(SchedulerRequestKey schedulerKey,
+      FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) {
 
     // Clearly we need containers for this application...
     if (type == NodeType.OFF_SWITCH) {
@@ -242,15 +245,16 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
       // 'Delay' off-switch
       ResourceRequest offSwitchRequest =
-          application.getResourceRequest(priority, ResourceRequest.ANY);
-      long missedOpportunities = application.getSchedulingOpportunities(priority);
+          application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
+      long missedOpportunities =
+          application.getSchedulingOpportunities(schedulerKey);
       long requiredContainers = offSwitchRequest.getNumContainers();
 
       float localityWaitFactor =
-          getLocalityWaitFactor(priority, rmContext.getScheduler()
+          getLocalityWaitFactor(schedulerKey, rmContext.getScheduler()
               .getNumClusterNodes());
-      // Cap the delay by the number of nodes in the cluster. Under most conditions
-      // this means we will consider each node in the cluster before
+      // Cap the delay by the number of nodes in the cluster. Under most
+      // conditions this means we will consider each node in the cluster before
       // accepting an off-switch assignment.
       return (Math.min(rmContext.getScheduler().getNumClusterNodes(),
           (requiredContainers * localityWaitFactor)) < missedOpportunities);
@@ -258,7 +262,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
     // Check if we need containers on this rack
     ResourceRequest rackLocalRequest =
-        application.getResourceRequest(priority, node.getRackName());
+        application.getResourceRequest(schedulerKey, node.getRackName());
     if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) {
       return false;
     }
@@ -266,7 +270,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     // If we are here, we do need containers on this rack for RACK_LOCAL req
     if (type == NodeType.RACK_LOCAL) {
       // 'Delay' rack-local just a little bit...
-      long missedOpportunities = application.getSchedulingOpportunities(priority);
+      long missedOpportunities =
+          application.getSchedulingOpportunities(schedulerKey);
       return getActualNodeLocalityDelay() < missedOpportunities;
     }
 
@@ -274,7 +279,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     if (type == NodeType.NODE_LOCAL) {
       // Now check if we need containers on this host...
       ResourceRequest nodeLocalRequest =
-          application.getResourceRequest(priority, node.getNodeName());
+          application.getResourceRequest(schedulerKey, node.getNodeName());
       if (nodeLocalRequest != null) {
         return nodeLocalRequest.getNumContainers() > 0;
       }
@@ -285,10 +290,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
   private ContainerAllocation assignNodeLocalContainers(
       Resource clusterResource, ResourceRequest nodeLocalResourceRequest,
-      FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
-      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
-    if (canAssign(priority, node, NodeType.NODE_LOCAL, reservedContainer)) {
-      return assignContainer(clusterResource, node, priority,
+      FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
+      RMContainer reservedContainer, SchedulingMode schedulingMode,
+      ResourceLimits currentResoureLimits) {
+    if (canAssign(schedulerKey, node, NodeType.NODE_LOCAL, reservedContainer)) {
+      return assignContainer(clusterResource, node, schedulerKey,
           nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
           schedulingMode, currentResoureLimits);
     }
@@ -299,10 +305,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
   private ContainerAllocation assignRackLocalContainers(
       Resource clusterResource, ResourceRequest rackLocalResourceRequest,
-      FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
-      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
-    if (canAssign(priority, node, NodeType.RACK_LOCAL, reservedContainer)) {
-      return assignContainer(clusterResource, node, priority,
+      FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
+      RMContainer reservedContainer, SchedulingMode schedulingMode,
+      ResourceLimits currentResoureLimits) {
+    if (canAssign(schedulerKey, node, NodeType.RACK_LOCAL, reservedContainer)) {
+      return assignContainer(clusterResource, node, schedulerKey,
           rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
           schedulingMode, currentResoureLimits);
     }
@@ -313,10 +320,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
   private ContainerAllocation assignOffSwitchContainers(
       Resource clusterResource, ResourceRequest offSwitchResourceRequest,
-      FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
-      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
-    if (canAssign(priority, node, NodeType.OFF_SWITCH, reservedContainer)) {
-      return assignContainer(clusterResource, node, priority,
+      FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
+      RMContainer reservedContainer, SchedulingMode schedulingMode,
+      ResourceLimits currentResoureLimits) {
+    if (canAssign(schedulerKey, node, NodeType.OFF_SWITCH, reservedContainer)) {
+      return assignContainer(clusterResource, node, schedulerKey,
           offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
           schedulingMode, currentResoureLimits);
     }
@@ -327,20 +335,21 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   }
 
   private ContainerAllocation assignContainersOnNode(Resource clusterResource,
-      FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
-      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
+      FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
+      RMContainer reservedContainer, SchedulingMode schedulingMode,
+      ResourceLimits currentResoureLimits) {
 
     ContainerAllocation allocation;
 
     NodeType requestType = null;
     // Data-local
     ResourceRequest nodeLocalResourceRequest =
-        application.getResourceRequest(priority, node.getNodeName());
+        application.getResourceRequest(schedulerKey, node.getNodeName());
     if (nodeLocalResourceRequest != null) {
       requestType = NodeType.NODE_LOCAL;
       allocation =
           assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
-              node, priority, reservedContainer, schedulingMode,
+              node, schedulerKey, reservedContainer, schedulingMode,
               currentResoureLimits);
       if (Resources.greaterThan(rc, clusterResource,
           allocation.getResourceToBeAllocated(), Resources.none())) {
@@ -351,7 +360,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
     // Rack-local
     ResourceRequest rackLocalResourceRequest =
-        application.getResourceRequest(priority, node.getRackName());
+        application.getResourceRequest(schedulerKey, node.getRackName());
     if (rackLocalResourceRequest != null) {
       if (!rackLocalResourceRequest.getRelaxLocality()) {
         return ContainerAllocation.PRIORITY_SKIPPED;
@@ -363,7 +372,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
       allocation =
           assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
-              node, priority, reservedContainer, schedulingMode,
+              node, schedulerKey, reservedContainer, schedulingMode,
               currentResoureLimits);
       if (Resources.greaterThan(rc, clusterResource,
           allocation.getResourceToBeAllocated(), Resources.none())) {
@@ -374,7 +383,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
     // Off-switch
     ResourceRequest offSwitchResourceRequest =
-        application.getResourceRequest(priority, ResourceRequest.ANY);
+        application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
     if (offSwitchResourceRequest != null) {
       if (!offSwitchResourceRequest.getRelaxLocality()) {
         return ContainerAllocation.PRIORITY_SKIPPED;
@@ -386,7 +395,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
       allocation =
           assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
-              node, priority, reservedContainer, schedulingMode,
+              node, schedulerKey, reservedContainer, schedulingMode,
               currentResoureLimits);
       allocation.requestNodeType = requestType;
       
@@ -403,21 +412,22 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   }
 
   private ContainerAllocation assignContainer(Resource clusterResource,
-      FiCaSchedulerNode node, Priority priority, ResourceRequest request,
-      NodeType type, RMContainer rmContainer, SchedulingMode schedulingMode,
-      ResourceLimits currentResoureLimits) {
+      FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
+      ResourceRequest request, NodeType type, RMContainer rmContainer,
+      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
     lastResourceRequest = request;
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getNodeName()
-        + " application=" + application.getApplicationId()
-        + " priority=" + priority.getPriority()
-        + " request=" + request + " type=" + type);
+          + " application=" + application.getApplicationId()
+          + " priority=" + schedulerKey.getPriority()
+          + " request=" + request + " type=" + type);
     }
 
     // check if the resource request can access the label
     if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
-        request.getNodeLabelExpression(), node.getPartition(), schedulingMode)) {
+        request.getNodeLabelExpression(), node.getPartition(),
+        schedulingMode)) {
       // this is a reserved container, but we cannot allocate it now according
       // to label not match. This can be caused by node label changed
       // We should un-reserve this container.
@@ -439,7 +449,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     }
 
     boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
-        priority, capability);
+        schedulerKey, capability);
 
     // Can we allocate a container on this node?
     long availableContainers =
@@ -504,8 +514,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
             resourceNeedToUnReserve = capability;
           }
           unreservedContainer =
-              application.findNodeToUnreserve(clusterResource, node, priority,
-                  resourceNeedToUnReserve);
+              application.findNodeToUnreserve(clusterResource, node,
+                  schedulerKey, resourceNeedToUnReserve);
           // When (minimum-unreserved-resource > 0 OR we cannot allocate
           // new/reserved
           // container (That means we *have to* unreserve some resource to
@@ -553,28 +563,28 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     }
   }
 
-  boolean
-      shouldAllocOrReserveNewContainer(Priority priority, Resource required) {
-    int requiredContainers = application.getTotalRequiredResources(priority);
-    int reservedContainers = application.getNumReservedContainers(priority);
+  boolean shouldAllocOrReserveNewContainer(
+      SchedulerRequestKey schedulerKey, Resource required) {
+    int requiredContainers =
+        application.getTotalRequiredResources(schedulerKey);
+    int reservedContainers = application.getNumReservedContainers(schedulerKey);
     int starvation = 0;
     if (reservedContainers > 0) {
-      float nodeFactor =
-          Resources
-              .ratio(rc, required, application.getCSLeafQueue().getMaximumAllocation());
+      float nodeFactor = Resources.ratio(
+          rc, required, application.getCSLeafQueue().getMaximumAllocation());
 
       // Use percentage of node required to bias against large containers...
       // Protect against corner case where you need the whole node with
       // Math.min(nodeFactor, minimumAllocationFactor)
       starvation =
-          (int) ((application.getReReservations(priority) / 
+          (int) ((application.getReReservations(schedulerKey) /
               (float) reservedContainers) * (1.0f - (Math.min(
                   nodeFactor, application.getCSLeafQueue()
                   .getMinimumAllocationFactor()))));
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("needsContainers:" + " app.#re-reserve="
-            + application.getReReservations(priority) + " reserved="
+            + application.getReReservations(schedulerKey) + " reserved="
             + reservedContainers + " nodeFactor=" + nodeFactor
             + " minAllocFactor="
             + application.getCSLeafQueue().getMinimumAllocationFactor()
@@ -585,13 +595,14 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   }
   
   private Container getContainer(RMContainer rmContainer,
-      FiCaSchedulerNode node, Resource capability, Priority priority) {
+      FiCaSchedulerNode node, Resource capability,
+      SchedulerRequestKey schedulerKey) {
     return (rmContainer != null) ? rmContainer.getContainer()
-        : createContainer(node, capability, priority);
+        : createContainer(node, capability, schedulerKey);
   }
 
   private Container createContainer(FiCaSchedulerNode node, Resource capability,
-      Priority priority) {
+      SchedulerRequestKey schedulerKey) {
 
     NodeId nodeId = node.getRMNode().getNodeID();
     ContainerId containerId =
@@ -600,22 +611,23 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
     // Create the container
     return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
-        .getHttpAddress(), capability, priority, null);
+        .getHttpAddress(), capability, schedulerKey.getPriority(), null);
   }
   
   private ContainerAllocation handleNewContainerAllocation(
       ContainerAllocation allocationResult, FiCaSchedulerNode node,
-      Priority priority, RMContainer reservedContainer, Container container) {
+      SchedulerRequestKey schedulerKey, RMContainer reservedContainer,
+      Container container) {
     // Handling container allocation
     // Did we previously reserve containers at this 'priority'?
     if (reservedContainer != null) {
-      application.unreserve(priority, node, reservedContainer);
+      application.unreserve(schedulerKey, node, reservedContainer);
     }
     
     // Inform the application
     RMContainer allocatedContainer =
         application.allocate(allocationResult.containerNodeType, node,
-            priority, lastResourceRequest, container);
+            schedulerKey, lastResourceRequest, container);
 
     // Does the application need this resource?
     if (allocatedContainer == null) {
@@ -637,12 +649,12 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   }
 
   ContainerAllocation doAllocation(ContainerAllocation allocationResult,
-      FiCaSchedulerNode node, Priority priority,
+      FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
       RMContainer reservedContainer) {
     // Create the container if necessary
     Container container =
         getContainer(reservedContainer, node,
-            allocationResult.getResourceToBeAllocated(), priority);
+            allocationResult.getResourceToBeAllocated(), schedulerKey);
 
     // something went wrong getting/creating the container
     if (container == null) {
@@ -655,11 +667,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) {
       // When allocating container
       allocationResult =
-          handleNewContainerAllocation(allocationResult, node, priority,
+          handleNewContainerAllocation(allocationResult, node, schedulerKey,
               reservedContainer, container);
     } else {
       // When reserving container
-      application.reserve(priority, node, reservedContainer, container);
+      application.reserve(schedulerKey, node, reservedContainer, container);
     }
     allocationResult.updatedContainer = container;
 
@@ -678,14 +690,15 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
         // RACK_LOCAL without delay.
         if (allocationResult.containerNodeType == NodeType.NODE_LOCAL
             || application.getCSLeafQueue().getRackLocalityFullReset()) {
-          application.resetSchedulingOpportunities(priority);
+          application.resetSchedulingOpportunities(schedulerKey);
         }
       }
 
       // Non-exclusive scheduling opportunity is different: we need reset
       // it every time to make sure non-labeled resource request will be
       // most likely allocated on non-labeled nodes first.
-      application.resetMissedNonPartitionedRequestSchedulingOpportunity(priority);
+      application.resetMissedNonPartitionedRequestSchedulingOpportunity(
+          schedulerKey);
     }
 
     return allocationResult;
@@ -693,15 +706,15 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   
   private ContainerAllocation allocate(Resource clusterResource,
       FiCaSchedulerNode node, SchedulingMode schedulingMode,
-      ResourceLimits resourceLimits, Priority priority,
+      ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey,
       RMContainer reservedContainer) {
     ContainerAllocation result =
         preAllocation(clusterResource, node, schedulingMode, resourceLimits,
-            priority, reservedContainer);
+            schedulerKey, reservedContainer);
 
     if (AllocationState.ALLOCATED == result.state
         || AllocationState.RESERVED == result.state) {
-      result = doAllocation(result, node, priority, reservedContainer);
+      result = doAllocation(result, node, schedulerKey, reservedContainer);
     }
 
     return result;
@@ -725,10 +738,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       }
       
       // Schedule in priority order
-      for (Priority priority : application.getPriorities()) {
+      for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) {
         ContainerAllocation result =
             allocate(clusterResource, node, schedulingMode, resourceLimits,
-                priority, null);
+                schedulerKey, null);
 
         AllocationState allocationState = result.getAllocationState();
         if (allocationState == AllocationState.PRIORITY_SKIPPED) {
@@ -744,7 +757,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     } else {
       ContainerAllocation result =
           allocate(clusterResource, node, schedulingMode, resourceLimits,
-              reservedContainer.getReservedPriority(), reservedContainer);
+              reservedContainer.getReservedSchedulerKey(), reservedContainer);
       return getCSAssignmentFromAllocateResult(clusterResource, result,
           reservedContainer);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 8009580..67d93a4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
@@ -181,7 +182,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   }
 
   public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node,
-      Priority priority, ResourceRequest request, 
+      SchedulerRequestKey schedulerKey, ResourceRequest request,
       Container container) {
 
     if (isStopped) {
@@ -190,10 +191,10 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     
     // Required sanity check - AM can call 'allocate' to update resource 
     // request without locking the scheduler, hence we need to check
-    if (getTotalRequiredResources(priority) <= 0) {
+    if (getTotalRequiredResources(schedulerKey) <= 0) {
       return null;
     }
-    
+
     // Create RMContainer
     RMContainer rmContainer =
         new RMContainerImpl(container, this.getApplicationAttemptId(),
@@ -211,7 +212,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
     // Update consumption and track allocations
     List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
-        type, node, priority, request, container);
+        type, node, schedulerKey, request, container);
 
     attemptResourceUsage.incUsed(node.getPartition(), container.getResource());
 
@@ -235,13 +236,13 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     return rmContainer;
   }
 
-  public synchronized boolean unreserve(Priority priority,
+  public synchronized boolean unreserve(SchedulerRequestKey schedulerKey,
       FiCaSchedulerNode node, RMContainer rmContainer) {
     // Cancel increase request (if it has reserved increase request 
     rmContainer.cancelIncreaseReservation();
     
     // Done with the reservation?
-    if (internalUnreserve(node, priority)) {
+    if (internalUnreserve(node, schedulerKey)) {
       node.unreserveResource(this);
 
       // Update reserved metrics
@@ -254,12 +255,14 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     return false;
   }
 
-  private boolean internalUnreserve(FiCaSchedulerNode node, Priority priority) {
+  private boolean internalUnreserve(FiCaSchedulerNode node,
+      SchedulerRequestKey schedulerKey) {
     Map<NodeId, RMContainer> reservedContainers =
-      this.reservedContainers.get(priority);
+        this.reservedContainers.get(schedulerKey);
 
     if (reservedContainers != null) {
-      RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
+      RMContainer reservedContainer =
+          reservedContainers.remove(node.getNodeID());
 
       // unreserve is now triggered in new scenarios (preemption)
       // as a consequence reservedcontainer might be null, adding NP-checks
@@ -268,17 +271,18 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
           && reservedContainer.getContainer().getResource() != null) {
 
         if (reservedContainers.isEmpty()) {
-          this.reservedContainers.remove(priority);
+          this.reservedContainers.remove(schedulerKey);
         }
         // Reset the re-reservation count
-        resetReReservations(priority);
+        resetReReservations(schedulerKey);
 
         Resource resource = reservedContainer.getReservedResource();
         this.attemptResourceUsage.decReserved(node.getPartition(), resource);
 
         LOG.info("Application " + getApplicationId() + " unreserved "
             + " on node " + node + ", currently has "
-            + reservedContainers.size() + " at priority " + priority
+            + reservedContainers.size()
+            + " at priority " + schedulerKey.getPriority()
             + "; currentReservation " + this.attemptResourceUsage.getReserved()
             + " on node-label=" + node.getPartition());
         return true;
@@ -288,10 +292,10 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   }
 
   public synchronized float getLocalityWaitFactor(
-      Priority priority, int clusterNodes) {
+      SchedulerRequestKey schedulerKey, int clusterNodes) {
     // Estimate: Required unique resources (i.e. hosts + racks)
     int requiredResources = 
-        Math.max(this.getResourceRequests(priority).size() - 1, 0);
+        Math.max(this.getResourceRequests(schedulerKey).size() - 1, 0);
     
     // waitFactor can't be more than '1' 
     // i.e. no point skipping more than clustersize opportunities
@@ -354,14 +358,14 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
         newlyIncreasedContainers, newlyDecreasedContainers);
   }
   
-  synchronized public NodeId getNodeIdToUnreserve(Priority priority,
-      Resource resourceNeedUnreserve, ResourceCalculator rc,
-      Resource clusterResource) {
+  synchronized public NodeId getNodeIdToUnreserve(
+      SchedulerRequestKey schedulerKey, Resource resourceNeedUnreserve,
+      ResourceCalculator rc, Resource clusterResource) {
 
     // first go around make this algorithm simple and just grab first
     // reservation that has enough resources
     Map<NodeId, RMContainer> reservedContainers = this.reservedContainers
-        .get(priority);
+        .get(schedulerKey);
 
     if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
       for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) {
@@ -417,17 +421,17 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       ((FiCaSchedulerApp) appAttempt).getHeadroomProvider();
   }
   
-  public boolean reserveIncreasedContainer(Priority priority, 
+  public boolean reserveIncreasedContainer(SchedulerRequestKey schedulerKey,
       FiCaSchedulerNode node,
       RMContainer rmContainer, Resource reservedResource) {
     // Inform the application
-    if (super.reserveIncreasedContainer(node, priority, rmContainer,
+    if (super.reserveIncreasedContainer(node, schedulerKey, rmContainer,
         reservedResource)) {
 
       queue.getMetrics().reserveResource(getUser(), reservedResource);
 
       // Update the node
-      node.reserveResource(this, priority, rmContainer);
+      node.reserveResource(this, schedulerKey, rmContainer);
       
       // Succeeded
       return true;
@@ -436,7 +440,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     return false;
   }
 
-  public void reserve(Priority priority,
+  public void reserve(SchedulerRequestKey schedulerKey,
       FiCaSchedulerNode node, RMContainer rmContainer, Container container) {
     // Update reserved metrics if this is the first reservation
     if (rmContainer == null) {
@@ -445,19 +449,19 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     }
 
     // Inform the application
-    rmContainer = super.reserve(node, priority, rmContainer, container);
+    rmContainer = super.reserve(node, schedulerKey, rmContainer, container);
 
     // Update the node
-    node.reserveResource(this, priority, rmContainer);
+    node.reserveResource(this, schedulerKey, rmContainer);
   }
 
   @VisibleForTesting
   public RMContainer findNodeToUnreserve(Resource clusterResource,
-      FiCaSchedulerNode node, Priority priority,
+      FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
       Resource minimumUnreservedResource) {
     // need to unreserve some other container first
     NodeId idToUnreserve =
-        getNodeIdToUnreserve(priority, minimumUnreservedResource,
+        getNodeIdToUnreserve(schedulerKey, minimumUnreservedResource,
             rc, clusterResource);
     if (idToUnreserve == null) {
       if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
index f90a53c..d79fcaf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
@@ -23,12 +23,13 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -54,7 +55,7 @@ public class FiCaSchedulerNode extends SchedulerNode {
 
   @Override
   public synchronized void reserveResource(
-      SchedulerApplicationAttempt application, Priority priority,
+      SchedulerApplicationAttempt application, SchedulerRequestKey priority,
       RMContainer container) {
     // Check if it's already reserved
     RMContainer reservedContainer = getReservedContainer();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 1eead9a..8f074cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -97,8 +98,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    * at the current allowed level and the time since the last container
    * was scheduled. Currently we use only the former.
    */
-  private final Map<Priority, NodeType> allowedLocalityLevel =
-      new HashMap<Priority, NodeType>();
+  private final Map<SchedulerRequestKey, NodeType> allowedLocalityLevel =
+      new HashMap<>();
 
   public FSAppAttempt(FairScheduler scheduler,
       ApplicationAttemptId applicationAttemptId, String user, FSLeafQueue queue,
@@ -163,23 +164,23 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   }
 
   private synchronized void unreserveInternal(
-      Priority priority, FSSchedulerNode node) {
+      SchedulerRequestKey schedulerKey, FSSchedulerNode node) {
     Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(priority);
+        this.reservedContainers.get(schedulerKey);
     RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
     if (reservedContainers.isEmpty()) {
-      this.reservedContainers.remove(priority);
+      this.reservedContainers.remove(schedulerKey);
     }
     
     // Reset the re-reservation count
-    resetReReservations(priority);
+    resetReReservations(schedulerKey);
 
     Resource resource = reservedContainer.getContainer().getResource();
     this.attemptResourceUsage.decReserved(resource);
 
     LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
         + node + ", currently has " + reservedContainers.size()
-        + " at priority " + priority + "; currentReservation "
+        + " at priority " + schedulerKey.getPriority() + "; currentReservation "
         + this.attemptResourceUsage.getReserved());
   }
 
@@ -239,10 +240,10 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   }
 
   public synchronized float getLocalityWaitFactor(
-      Priority priority, int clusterNodes) {
+      SchedulerRequestKey schedulerKey, int clusterNodes) {
     // Estimate: Required unique resources (i.e. hosts + racks)
     int requiredResources = 
-        Math.max(this.getResourceRequests(priority).size() - 1, 0);
+        Math.max(this.getResourceRequests(schedulerKey).size() - 1, 0);
     
     // waitFactor can't be more than '1' 
     // i.e. no point skipping more than clustersize opportunities
@@ -254,9 +255,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    * current size of the cluster and thresholds indicating how many nodes to
    * fail at (as a fraction of cluster size) before relaxing scheduling
    * constraints.
+   * @param schedulerKey SchedulerRequestKey
+   * @param numNodes Num Nodes
+   * @param nodeLocalityThreshold nodeLocalityThreshold
+   * @param rackLocalityThreshold rackLocalityThreshold
+   * @return NodeType
    */
-  public synchronized NodeType getAllowedLocalityLevel(Priority priority,
-      int numNodes, double nodeLocalityThreshold, double rackLocalityThreshold) {
+  public synchronized NodeType getAllowedLocalityLevel(
+      SchedulerRequestKey schedulerKey, int numNodes,
+      double nodeLocalityThreshold, double rackLocalityThreshold) {
     // upper limit on threshold
     if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; }
     if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; }
@@ -267,12 +274,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     }
 
     // Default level is NODE_LOCAL
-    if (!allowedLocalityLevel.containsKey(priority)) {
-      allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
+    if (!allowedLocalityLevel.containsKey(schedulerKey)) {
+      allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
       return NodeType.NODE_LOCAL;
     }
 
-    NodeType allowed = allowedLocalityLevel.get(priority);
+    NodeType allowed = allowedLocalityLevel.get(schedulerKey);
 
     // If level is already most liberal, we're done
     if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH;
@@ -281,27 +288,32 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       rackLocalityThreshold;
 
     // Relax locality constraints once we've surpassed threshold.
-    if (getSchedulingOpportunities(priority) > (numNodes * threshold)) {
+    if (getSchedulingOpportunities(schedulerKey) > (numNodes * threshold)) {
       if (allowed.equals(NodeType.NODE_LOCAL)) {
-        allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
-        resetSchedulingOpportunities(priority);
+        allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
+        resetSchedulingOpportunities(schedulerKey);
       }
       else if (allowed.equals(NodeType.RACK_LOCAL)) {
-        allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
-        resetSchedulingOpportunities(priority);
+        allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
+        resetSchedulingOpportunities(schedulerKey);
       }
     }
-    return allowedLocalityLevel.get(priority);
+    return allowedLocalityLevel.get(schedulerKey);
   }
 
   /**
    * Return the level at which we are allowed to schedule containers.
    * Given the thresholds indicating how much time passed before relaxing
    * scheduling constraints.
+   * @param schedulerKey SchedulerRequestKey
+   * @param nodeLocalityDelayMs nodeLocalityThreshold
+   * @param rackLocalityDelayMs nodeLocalityDelayMs
+   * @param currentTimeMs currentTimeMs
+   * @return NodeType
    */
-  public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority,
-          long nodeLocalityDelayMs, long rackLocalityDelayMs,
-          long currentTimeMs) {
+  public synchronized NodeType getAllowedLocalityLevelByTime(
+      SchedulerRequestKey schedulerKey, long nodeLocalityDelayMs,
+      long rackLocalityDelayMs, long currentTimeMs) {
 
     // if not being used, can schedule anywhere
     if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) {
@@ -309,19 +321,19 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     }
 
     // default level is NODE_LOCAL
-    if (!allowedLocalityLevel.containsKey(priority)) {
+    if (!allowedLocalityLevel.containsKey(schedulerKey)) {
       // add the initial time of priority to prevent comparing with FsApp
       // startTime and allowedLocalityLevel degrade
-      lastScheduledContainer.put(priority, currentTimeMs);
+      lastScheduledContainer.put(schedulerKey, currentTimeMs);
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Init the lastScheduledContainer time, priority: " + priority
-            + ", time: " + currentTimeMs);
+        LOG.debug("Init the lastScheduledContainer time, priority: "
+            + schedulerKey.getPriority() + ", time: " + currentTimeMs);
       }
-      allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
+      allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
       return NodeType.NODE_LOCAL;
     }
 
-    NodeType allowed = allowedLocalityLevel.get(priority);
+    NodeType allowed = allowedLocalityLevel.get(schedulerKey);
 
     // if level is already most liberal, we're done
     if (allowed.equals(NodeType.OFF_SWITCH)) {
@@ -330,8 +342,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
     // check waiting time
     long waitTime = currentTimeMs;
-    if (lastScheduledContainer.containsKey(priority)) {
-      waitTime -= lastScheduledContainer.get(priority);
+    if (lastScheduledContainer.containsKey(schedulerKey)) {
+      waitTime -= lastScheduledContainer.get(schedulerKey);
     } else {
       waitTime -= getStartTime();
     }
@@ -341,43 +353,43 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
     if (waitTime > thresholdTime) {
       if (allowed.equals(NodeType.NODE_LOCAL)) {
-        allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
-        resetSchedulingOpportunities(priority, currentTimeMs);
+        allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
+        resetSchedulingOpportunities(schedulerKey, currentTimeMs);
       } else if (allowed.equals(NodeType.RACK_LOCAL)) {
-        allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
-        resetSchedulingOpportunities(priority, currentTimeMs);
+        allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
+        resetSchedulingOpportunities(schedulerKey, currentTimeMs);
       }
     }
-    return allowedLocalityLevel.get(priority);
+    return allowedLocalityLevel.get(schedulerKey);
   }
 
   synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
-      Priority priority, ResourceRequest request,
+      SchedulerRequestKey schedulerKey, ResourceRequest request,
       Container reservedContainer) {
     // Update allowed locality level
-    NodeType allowed = allowedLocalityLevel.get(priority);
+    NodeType allowed = allowedLocalityLevel.get(schedulerKey);
     if (allowed != null) {
       if (allowed.equals(NodeType.OFF_SWITCH) &&
           (type.equals(NodeType.NODE_LOCAL) ||
               type.equals(NodeType.RACK_LOCAL))) {
-        this.resetAllowedLocalityLevel(priority, type);
+        this.resetAllowedLocalityLevel(schedulerKey, type);
       }
       else if (allowed.equals(NodeType.RACK_LOCAL) &&
           type.equals(NodeType.NODE_LOCAL)) {
-        this.resetAllowedLocalityLevel(priority, type);
+        this.resetAllowedLocalityLevel(schedulerKey, type);
       }
     }
 
     // Required sanity check - AM can call 'allocate' to update resource 
     // request without locking the scheduler, hence we need to check
-    if (getTotalRequiredResources(priority) <= 0) {
+    if (getTotalRequiredResources(schedulerKey) <= 0) {
       return null;
     }
 
     Container container = reservedContainer;
     if (container == null) {
       container =
-          createContainer(node, request.getCapability(), request.getPriority());
+          createContainer(node, request.getCapability(), schedulerKey);
     }
     
     // Create RMContainer
@@ -392,7 +404,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
     // Update consumption and track allocations
     List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
-        type, node, priority, request, container);
+        type, node, schedulerKey, request, container);
     this.attemptResourceUsage.incUsed(container.getResource());
 
     // Update resource requests related to "request" and store in RMContainer
@@ -419,13 +431,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    * Should be called when the scheduler assigns a container at a higher
    * degree of locality than the current threshold. Reset the allowed locality
    * level to a higher degree of locality.
+   * @param schedulerKey Scheduler Key
+   * @param level NodeType
    */
-  public synchronized void resetAllowedLocalityLevel(Priority priority,
-      NodeType level) {
-    NodeType old = allowedLocalityLevel.get(priority);
+  public synchronized void resetAllowedLocalityLevel(
+      SchedulerRequestKey schedulerKey, NodeType level) {
+    NodeType old = allowedLocalityLevel.get(schedulerKey);
     LOG.info("Raising locality level from " + old + " to " + level + " at " +
-        " priority " + priority);
-    allowedLocalityLevel.put(priority, level);
+        " priority " + schedulerKey.getPriority());
+    allowedLocalityLevel.put(schedulerKey, level);
   }
 
   // related methods
@@ -468,9 +482,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    * Create and return a container object reflecting an allocation for the
    * given appliction on the given node with the given capability and
    * priority.
+   * @param node Node
+   * @param capability Capability
+   * @param schedulerKey Scheduler Key
+   * @return Container
    */
-  public Container createContainer(
-      FSSchedulerNode node, Resource capability, Priority priority) {
+  public Container createContainer(FSSchedulerNode node, Resource capability,
+      SchedulerRequestKey schedulerKey) {
 
     NodeId nodeId = node.getRMNode().getNodeID();
     ContainerId containerId = BuilderUtils.newContainerId(
@@ -479,7 +497,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     // Create the container
     Container container =
         BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
-            .getHttpAddress(), capability, priority, null);
+            .getHttpAddress(), capability, schedulerKey.getPriority(), null);
 
     return container;
   }
@@ -492,26 +510,26 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    * return whether reservation was possible with the current threshold limits
    */
   private boolean reserve(ResourceRequest request, FSSchedulerNode node,
-      Container reservedContainer, NodeType type) {
+      Container reservedContainer, NodeType type,
+      SchedulerRequestKey schedulerKey) {
 
-    Priority priority = request.getPriority();
     if (!reservationExceedsThreshold(node, type)) {
       LOG.info("Making reservation: node=" + node.getNodeName() +
               " app_id=" + getApplicationId());
       if (reservedContainer == null) {
         reservedContainer =
             createContainer(node, request.getCapability(),
-              request.getPriority());
+              schedulerKey);
         getMetrics().reserveResource(getUser(),
             reservedContainer.getResource());
         RMContainer rmContainer =
-                super.reserve(node, priority, null, reservedContainer);
-        node.reserveResource(this, priority, rmContainer);
+                super.reserve(node, schedulerKey, null, reservedContainer);
+        node.reserveResource(this, schedulerKey, rmContainer);
         setReservation(node);
       } else {
         RMContainer rmContainer = node.getReservedContainer();
-        super.reserve(node, priority, rmContainer, reservedContainer);
-        node.reserveResource(this, priority, rmContainer);
+        super.reserve(node, schedulerKey, rmContainer, reservedContainer);
+        node.reserveResource(this, schedulerKey, rmContainer);
         setReservation(node);
       }
       return true;
@@ -548,13 +566,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     }
     return false;
   }
+
   /**
-   * Remove the reservation on {@code node} at the given {@link Priority}.
+   * Remove the reservation on {@code node} at the given SchedulerRequestKey.
    * This dispatches SchedulerNode handlers as well.
+   * @param schedulerKey Scheduler Key
+   * @param node Node
    */
-  public void unreserve(Priority priority, FSSchedulerNode node) {
+  public void unreserve(SchedulerRequestKey schedulerKey,
+      FSSchedulerNode node) {
     RMContainer rmContainer = node.getReservedContainer();
-    unreserveInternal(priority, node);
+    unreserveInternal(schedulerKey, node);
     node.unreserveResource(this);
     clearReservation(node);
     getMetrics().unreserveResource(
@@ -618,7 +640,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    */
   private Resource assignContainer(
       FSSchedulerNode node, ResourceRequest request, NodeType type,
-      boolean reserved) {
+      boolean reserved, SchedulerRequestKey schedulerKey) {
 
     // How much does this request need?
     Resource capability = request.getCapability();
@@ -635,19 +657,19 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     if (Resources.fitsIn(capability, available)) {
       // Inform the application of the new container for this request
       RMContainer allocatedContainer =
-          allocate(type, node, request.getPriority(), request,
+          allocate(type, node, schedulerKey, request,
               reservedContainer);
       if (allocatedContainer == null) {
         // Did the application need this resource?
         if (reserved) {
-          unreserve(request.getPriority(), node);
+          unreserve(schedulerKey, node);
         }
         return Resources.none();
       }
 
       // If we had previously made a reservation, delete it
       if (reserved) {
-        unreserve(request.getPriority(), node);
+        unreserve(schedulerKey, node);
       }
 
       // Inform the node
@@ -667,7 +689,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
     // The desired container won't fit here, so reserve
     if (isReservable(capability) &&
-        reserve(request, node, reservedContainer, type)) {
+        reserve(request, node, reservedContainer, type, schedulerKey)) {
       return FairScheduler.CONTAINER_RESERVED;
     } else {
       if (LOG.isDebugEnabled()) {
@@ -683,8 +705,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
         getQueue().getPolicy().getResourceCalculator(), capacity);
   }
 
-  private boolean hasNodeOrRackLocalRequests(Priority priority) {
-    return getResourceRequests(priority).size() > 1;
+  private boolean hasNodeOrRackLocalRequests(SchedulerRequestKey schedulerKey) {
+    return getResourceRequests(schedulerKey).size() > 1;
   }
 
   /**
@@ -707,26 +729,26 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved);
     }
 
-    Collection<Priority> prioritiesToTry = (reserved) ?
-        Arrays.asList(node.getReservedContainer().getReservedPriority()) :
-        getPriorities();
+    Collection<SchedulerRequestKey> keysToTry = (reserved) ?
+        Arrays.asList(node.getReservedContainer().getReservedSchedulerKey()) :
+        getSchedulerKeys();
 
     // For each priority, see if we can schedule a node local, rack local
     // or off-switch request. Rack of off-switch requests may be delayed
     // (not scheduled) in order to promote better locality.
     synchronized (this) {
-      for (Priority priority : prioritiesToTry) {
+      for (SchedulerRequestKey schedulerKey : keysToTry) {
         // Skip it for reserved container, since
         // we already check it in isValidReservation.
-        if (!reserved && !hasContainerForNode(priority, node)) {
+        if (!reserved && !hasContainerForNode(schedulerKey, node)) {
           continue;
         }
 
-        addSchedulingOpportunity(priority);
+        addSchedulingOpportunity(schedulerKey);
 
-        ResourceRequest rackLocalRequest = getResourceRequest(priority,
+        ResourceRequest rackLocalRequest = getResourceRequest(schedulerKey,
             node.getRackName());
-        ResourceRequest localRequest = getResourceRequest(priority,
+        ResourceRequest localRequest = getResourceRequest(schedulerKey,
             node.getNodeName());
 
         if (localRequest != null && !localRequest.getRelaxLocality()) {
@@ -736,12 +758,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
         NodeType allowedLocality;
         if (scheduler.isContinuousSchedulingEnabled()) {
-          allowedLocality = getAllowedLocalityLevelByTime(priority,
+          allowedLocality = getAllowedLocalityLevelByTime(schedulerKey,
               scheduler.getNodeLocalityDelayMs(),
               scheduler.getRackLocalityDelayMs(),
               scheduler.getClock().getTime());
         } else {
-          allowedLocality = getAllowedLocalityLevel(priority,
+          allowedLocality = getAllowedLocalityLevel(schedulerKey,
               scheduler.getNumClusterNodes(),
               scheduler.getNodeLocalityThreshold(),
               scheduler.getRackLocalityThreshold());
@@ -750,7 +772,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
         if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
             && localRequest != null && localRequest.getNumContainers() != 0) {
           return assignContainer(node, localRequest,
-              NodeType.NODE_LOCAL, reserved);
+              NodeType.NODE_LOCAL, reserved, schedulerKey);
         }
 
         if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) {
@@ -761,21 +783,22 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
             && (allowedLocality.equals(NodeType.RACK_LOCAL) ||
             allowedLocality.equals(NodeType.OFF_SWITCH))) {
           return assignContainer(node, rackLocalRequest,
-              NodeType.RACK_LOCAL, reserved);
+              NodeType.RACK_LOCAL, reserved, schedulerKey);
         }
 
         ResourceRequest offSwitchRequest =
-            getResourceRequest(priority, ResourceRequest.ANY);
+            getResourceRequest(schedulerKey, ResourceRequest.ANY);
         if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) {
           continue;
         }
 
         if (offSwitchRequest != null &&
             offSwitchRequest.getNumContainers() != 0) {
-          if (!hasNodeOrRackLocalRequests(priority) ||
+          if (!hasNodeOrRackLocalRequests(schedulerKey) ||
               allowedLocality.equals(NodeType.OFF_SWITCH)) {
             return assignContainer(
-                node, offSwitchRequest, NodeType.OFF_SWITCH, reserved);
+                node, offSwitchRequest, NodeType.OFF_SWITCH, reserved,
+                schedulerKey);
           }
         }
       }
@@ -787,10 +810,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    * Whether this app has containers requests that could be satisfied on the
    * given node, if the node had full space.
    */
-  private boolean hasContainerForNode(Priority prio, FSSchedulerNode node) {
-    ResourceRequest anyRequest = getResourceRequest(prio, ResourceRequest.ANY);
-    ResourceRequest rackRequest = getResourceRequest(prio, node.getRackName());
-    ResourceRequest nodeRequest = getResourceRequest(prio, node.getNodeName());
+  private boolean hasContainerForNode(SchedulerRequestKey key,
+      FSSchedulerNode node) {
+    ResourceRequest anyRequest = getResourceRequest(key, ResourceRequest.ANY);
+    ResourceRequest rackRequest = getResourceRequest(key, node.getRackName());
+    ResourceRequest nodeRequest = getResourceRequest(key, node.getNodeName());
 
     return
         // There must be outstanding requests at the given priority:
@@ -812,9 +836,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   }
 
   private boolean isValidReservation(FSSchedulerNode node) {
-    Priority reservedPriority = node.getReservedContainer().
-        getReservedPriority();
-    return hasContainerForNode(reservedPriority, node) &&
+    SchedulerRequestKey schedulerKey = node.getReservedContainer().
+        getReservedSchedulerKey();
+    return hasContainerForNode(schedulerKey, node) &&
         !isOverAMShareLimit();
   }
 
@@ -830,13 +854,14 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    */
   public boolean assignReservedContainer(FSSchedulerNode node) {
     RMContainer rmContainer = node.getReservedContainer();
-    Priority reservedPriority = rmContainer.getReservedPriority();
+    SchedulerRequestKey reservedSchedulerKey =
+        rmContainer.getReservedSchedulerKey();
 
     if (!isValidReservation(node)) {
       // Don't hold the reservation if app can no longer use it
       LOG.info("Releasing reservation that cannot be satisfied for " +
           "application " + getApplicationAttemptId() + " on node " + node);
-      unreserve(reservedPriority, node);
+      unreserve(reservedSchedulerKey, node);
       return false;
     }
 
@@ -938,8 +963,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
     // Add up outstanding resource requests
     synchronized (this) {
-      for (Priority p : getPriorities()) {
-        ResourceRequest r = getResourceRequest(p, ResourceRequest.ANY);
+      for (SchedulerRequestKey k : getSchedulerKeys()) {
+        ResourceRequest r = getResourceRequest(k, ResourceRequest.ANY);
         if (r != null) {
           Resources.multiplyAndAddTo(demand,
               r.getCapability(), r.getNumContainers());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index c86201a..024ec67 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -23,10 +23,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 
 @Private
@@ -43,7 +43,7 @@ public class FSSchedulerNode extends SchedulerNode {
 
   @Override
   public synchronized void reserveResource(
-      SchedulerApplicationAttempt application, Priority priority,
+      SchedulerApplicationAttempt application, SchedulerRequestKey schedulerKey,
       RMContainer container) {
     // Check if it's already reserved
     RMContainer reservedContainer = getReservedContainer();
@@ -102,4 +102,5 @@ public class FSSchedulerNode extends SchedulerNode {
   public synchronized FSAppAttempt getReservedAppSchedulable() {
     return reservedAppSchedulable;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index bc953ba..ac384a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -866,7 +866,7 @@ public class FairScheduler extends
     FSSchedulerNode node = getFSSchedulerNode(container.getNodeId());
 
     if (rmContainer.getState() == RMContainerState.RESERVED) {
-      application.unreserve(rmContainer.getReservedPriority(), node);
+      application.unreserve(rmContainer.getReservedSchedulerKey(), node);
     } else {
       application.containerCompleted(rmContainer, containerStatus, event);
       node.releaseContainer(container);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index eaab495..fe8d0af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerCha
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -514,14 +515,15 @@ public class FifoScheduler extends
           continue;
         }
 
-        for (Priority priority : application.getPriorities()) {
-          int maxContainers = 
-            getMaxAllocatableContainers(application, priority, node, 
-                NodeType.OFF_SWITCH); 
+        for (SchedulerRequestKey schedulerKey :
+            application.getSchedulerKeys()) {
+          int maxContainers =
+              getMaxAllocatableContainers(application, schedulerKey, node,
+                  NodeType.OFF_SWITCH);
           // Ensure the application needs containers of this priority
           if (maxContainers > 0) {
-            int assignedContainers = 
-              assignContainersOnNode(node, application, priority);
+            int assignedContainers =
+                assignContainersOnNode(node, application, schedulerKey);
             // Do not assign out of order w.r.t priorities
             if (assignedContainers == 0) {
               break;
@@ -553,11 +555,11 @@ public class FifoScheduler extends
   }
 
   private int getMaxAllocatableContainers(FiCaSchedulerApp application,
-      Priority priority, FiCaSchedulerNode node, NodeType type) {
+      SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, NodeType type) {
     int maxContainers = 0;
-    
-    ResourceRequest offSwitchRequest = 
-      application.getResourceRequest(priority, ResourceRequest.ANY);
+
+    ResourceRequest offSwitchRequest =
+        application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
     if (offSwitchRequest != null) {
       maxContainers = offSwitchRequest.getNumContainers();
     }
@@ -567,8 +569,9 @@ public class FifoScheduler extends
     }
 
     if (type == NodeType.RACK_LOCAL) {
-      ResourceRequest rackLocalRequest = 
-        application.getResourceRequest(priority, node.getRMNode().getRackName());
+      ResourceRequest rackLocalRequest =
+          application.getResourceRequest(schedulerKey, node.getRMNode()
+              .getRackName());
       if (rackLocalRequest == null) {
         return maxContainers;
       }
@@ -577,8 +580,9 @@ public class FifoScheduler extends
     }
 
     if (type == NodeType.NODE_LOCAL) {
-      ResourceRequest nodeLocalRequest = 
-        application.getResourceRequest(priority, node.getRMNode().getNodeAddress());
+      ResourceRequest nodeLocalRequest =
+          application.getResourceRequest(schedulerKey, node.getRMNode()
+              .getNodeAddress());
       if (nodeLocalRequest != null) {
         maxContainers = Math.min(maxContainers, nodeLocalRequest.getNumContainers());
       }
@@ -589,25 +593,25 @@ public class FifoScheduler extends
 
 
   private int assignContainersOnNode(FiCaSchedulerNode node, 
-      FiCaSchedulerApp application, Priority priority 
+      FiCaSchedulerApp application, SchedulerRequestKey schedulerKey
   ) {
     // Data-local
-    int nodeLocalContainers = 
-      assignNodeLocalContainers(node, application, priority); 
+    int nodeLocalContainers =
+        assignNodeLocalContainers(node, application, schedulerKey);
 
     // Rack-local
-    int rackLocalContainers = 
-      assignRackLocalContainers(node, application, priority);
+    int rackLocalContainers =
+        assignRackLocalContainers(node, application, schedulerKey);
 
     // Off-switch
     int offSwitchContainers =
-      assignOffSwitchContainers(node, application, priority);
+        assignOffSwitchContainers(node, application, schedulerKey);
 
 
     LOG.debug("assignContainersOnNode:" +
         " node=" + node.getRMNode().getNodeAddress() + 
         " application=" + application.getApplicationId().getId() +
-        " priority=" + priority.getPriority() + 
+        " priority=" + schedulerKey.getPriority() +
         " #assigned=" + 
         (nodeLocalContainers + rackLocalContainers + offSwitchContainers));
 
@@ -616,14 +620,14 @@ public class FifoScheduler extends
   }
 
   private int assignNodeLocalContainers(FiCaSchedulerNode node, 
-      FiCaSchedulerApp application, Priority priority) {
+      FiCaSchedulerApp application, SchedulerRequestKey schedulerKey) {
     int assignedContainers = 0;
-    ResourceRequest request = 
-      application.getResourceRequest(priority, node.getNodeName());
+    ResourceRequest request =
+        application.getResourceRequest(schedulerKey, node.getNodeName());
     if (request != null) {
       // Don't allocate on this node if we don't need containers on this rack
       ResourceRequest rackRequest =
-          application.getResourceRequest(priority, 
+          application.getResourceRequest(schedulerKey,
               node.getRMNode().getRackName());
       if (rackRequest == null || rackRequest.getNumContainers() <= 0) {
         return 0;
@@ -631,61 +635,62 @@ public class FifoScheduler extends
       
       int assignableContainers = 
         Math.min(
-            getMaxAllocatableContainers(application, priority, node, 
+            getMaxAllocatableContainers(application, schedulerKey, node,
                 NodeType.NODE_LOCAL), 
                 request.getNumContainers());
       assignedContainers = 
-        assignContainer(node, application, priority, 
+        assignContainer(node, application, schedulerKey,
             assignableContainers, request, NodeType.NODE_LOCAL);
     }
     return assignedContainers;
   }
 
   private int assignRackLocalContainers(FiCaSchedulerNode node, 
-      FiCaSchedulerApp application, Priority priority) {
+      FiCaSchedulerApp application, SchedulerRequestKey schedulerKey) {
     int assignedContainers = 0;
-    ResourceRequest request = 
-      application.getResourceRequest(priority, node.getRMNode().getRackName());
+    ResourceRequest request =
+        application.getResourceRequest(schedulerKey, node.getRMNode()
+            .getRackName());
     if (request != null) {
       // Don't allocate on this rack if the application doens't need containers
       ResourceRequest offSwitchRequest =
-          application.getResourceRequest(priority, ResourceRequest.ANY);
+          application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
       if (offSwitchRequest.getNumContainers() <= 0) {
         return 0;
       }
       
       int assignableContainers = 
         Math.min(
-            getMaxAllocatableContainers(application, priority, node, 
+            getMaxAllocatableContainers(application, schedulerKey, node,
                 NodeType.RACK_LOCAL), 
                 request.getNumContainers());
       assignedContainers = 
-        assignContainer(node, application, priority, 
+        assignContainer(node, application, schedulerKey,
             assignableContainers, request, NodeType.RACK_LOCAL);
     }
     return assignedContainers;
   }
 
   private int assignOffSwitchContainers(FiCaSchedulerNode node, 
-      FiCaSchedulerApp application, Priority priority) {
+      FiCaSchedulerApp application, SchedulerRequestKey schedulerKey) {
     int assignedContainers = 0;
-    ResourceRequest request = 
-      application.getResourceRequest(priority, ResourceRequest.ANY);
+    ResourceRequest request =
+        application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
     if (request != null) {
       assignedContainers = 
-        assignContainer(node, application, priority, 
+        assignContainer(node, application, schedulerKey,
             request.getNumContainers(), request, NodeType.OFF_SWITCH);
     }
     return assignedContainers;
   }
 
   private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application,
-      Priority priority, int assignableContainers, 
+      SchedulerRequestKey schedulerKey, int assignableContainers,
       ResourceRequest request, NodeType type) {
     LOG.debug("assignContainers:" +
         " node=" + node.getRMNode().getNodeAddress() + 
         " application=" + application.getApplicationId().getId() + 
-        " priority=" + priority.getPriority() + 
+        " priority=" + schedulerKey.getPriority().getPriority() +
         " assignableContainers=" + assignableContainers +
         " request=" + request + " type=" + type);
     Resource capability = request.getCapability();
@@ -707,13 +712,13 @@ public class FifoScheduler extends
         // Create the container
         Container container =
             BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
-              .getHttpAddress(), capability, priority, null);
+              .getHttpAddress(), capability, schedulerKey.getPriority(), null);
         
         // Allocate!
         
         // Inform the application
         RMContainer rmContainer =
-            application.allocate(type, node, priority, request, container);
+            application.allocate(type, node, schedulerKey, request, container);
         
         // Inform the node
         node.allocateContainer(rmContainer);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message