Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9F92D200B4F for ; Tue, 26 Jul 2016 23:55:12 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9E2A7160AA2; Tue, 26 Jul 2016 21:55:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4D2C8160AA4 for ; Tue, 26 Jul 2016 23:55:10 +0200 (CEST) Received: (qmail 84943 invoked by uid 500); 26 Jul 2016 21:55:09 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 84923 invoked by uid 99); 26 Jul 2016 21:55:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jul 2016 21:55:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 06937E0A7D; Tue, 26 Jul 2016 21:55:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: asuresh@apache.org To: common-commits@hadoop.apache.org Date: Tue, 26 Jul 2016 21:55:09 -0000 Message-Id: <7d0b5206a4114052b369aadbc0fdefec@git.apache.org> In-Reply-To: <14e0d293381248a08660dd8fdc90b272@git.apache.org> References: <14e0d293381248a08660dd8fdc90b272@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] hadoop git commit: YARN-5392. Replace use of Priority in the Scheduling infrastructure with an opaque ShedulerRequestKey. (asuresh and subru) archived-at: Tue, 26 Jul 2016 21:55:12 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index aae5292..4bae5be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -32,6 +31,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; + + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; @@ -80,7 +83,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { private ContainerAllocation preCheckForNewContainer(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, Priority priority) { + ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) { if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) { application.updateAppSkipNodeDiagnostics( CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE); @@ -88,7 +91,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } ResourceRequest anyRequest = - application.getResourceRequest(priority, ResourceRequest.ANY); + application.getResourceRequest(schedulerKey, ResourceRequest.ANY); if (null == anyRequest) { return ContainerAllocation.PRIORITY_SKIPPED; } @@ -97,7 +100,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { Resource required = anyRequest.getCapability(); // Do we need containers at this 'priority'? - if (application.getTotalRequiredResources(priority) <= 0) { + if (application.getTotalRequiredResources(schedulerKey) <= 0) { return ContainerAllocation.PRIORITY_SKIPPED; } @@ -126,7 +129,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } if (!application.getCSLeafQueue().getReservationContinueLooking()) { - if (!shouldAllocOrReserveNewContainer(priority, required)) { + if (!shouldAllocOrReserveNewContainer(schedulerKey, required)) { if (LOG.isDebugEnabled()) { LOG.debug("doesn't need containers based on reservation algo!"); } @@ -143,7 +146,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } // Inform the application it is about to get a scheduling opportunity - application.addSchedulingOpportunity(priority); + application.addSchedulingOpportunity(schedulerKey); // Increase missed-non-partitioned-resource-request-opportunity. // This is to make sure non-partitioned-resource-request will prefer @@ -152,8 +155,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { if (anyRequest.getNodeLabelExpression() .equals(RMNodeLabelsManager.NO_LABEL)) { missedNonPartitionedRequestSchedulingOpportunity = - application - .addMissedNonPartitionedRequestSchedulingOpportunity(priority); + application.addMissedNonPartitionedRequestSchedulingOpportunity( + schedulerKey); } if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { @@ -164,7 +167,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { .getScheduler().getNumClusterNodes()) { if (LOG.isDebugEnabled()) { LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId() - + " priority=" + priority + + " priority=" + schedulerKey.getPriority() + " because missed-non-partitioned-resource-request" + " opportunity under requred:" + " Now=" + missedNonPartitionedRequestSchedulingOpportunity + " required=" @@ -180,20 +183,20 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { ContainerAllocation preAllocation(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, Priority priority, + ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey, RMContainer reservedContainer) { ContainerAllocation result; if (null == reservedContainer) { // pre-check when allocating new container result = preCheckForNewContainer(clusterResource, node, schedulingMode, - resourceLimits, priority); + resourceLimits, schedulerKey); if (null != result) { return result; } } else { // pre-check when allocating reserved container - if (application.getTotalRequiredResources(priority) == 0) { + if (application.getTotalRequiredResources(schedulerKey) == 0) { // Release return new ContainerAllocation(reservedContainer, null, AllocationState.QUEUE_SKIPPED); @@ -202,13 +205,13 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // Try to allocate containers on node result = - assignContainersOnNode(clusterResource, node, priority, + assignContainersOnNode(clusterResource, node, schedulerKey, reservedContainer, schedulingMode, resourceLimits); if (null == reservedContainer) { if (result.state == AllocationState.PRIORITY_SKIPPED) { // Don't count 'skipped nodes' as a scheduling opportunity! - application.subtractSchedulingOpportunity(priority); + application.subtractSchedulingOpportunity(schedulerKey); } } @@ -216,10 +219,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } public synchronized float getLocalityWaitFactor( - Priority priority, int clusterNodes) { + SchedulerRequestKey schedulerKey, int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) int requiredResources = - Math.max(application.getResourceRequests(priority).size() - 1, 0); + Math.max(application.getResourceRequests(schedulerKey).size() - 1, 0); // waitFactor can't be more than '1' // i.e. no point skipping more than clustersize opportunities @@ -231,8 +234,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { .getCSLeafQueue().getNodeLocalityDelay()); } - private boolean canAssign(Priority priority, FiCaSchedulerNode node, - NodeType type, RMContainer reservedContainer) { + private boolean canAssign(SchedulerRequestKey schedulerKey, + FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) { // Clearly we need containers for this application... if (type == NodeType.OFF_SWITCH) { @@ -242,15 +245,16 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // 'Delay' off-switch ResourceRequest offSwitchRequest = - application.getResourceRequest(priority, ResourceRequest.ANY); - long missedOpportunities = application.getSchedulingOpportunities(priority); + application.getResourceRequest(schedulerKey, ResourceRequest.ANY); + long missedOpportunities = + application.getSchedulingOpportunities(schedulerKey); long requiredContainers = offSwitchRequest.getNumContainers(); float localityWaitFactor = - getLocalityWaitFactor(priority, rmContext.getScheduler() + getLocalityWaitFactor(schedulerKey, rmContext.getScheduler() .getNumClusterNodes()); - // Cap the delay by the number of nodes in the cluster. Under most conditions - // this means we will consider each node in the cluster before + // Cap the delay by the number of nodes in the cluster. Under most + // conditions this means we will consider each node in the cluster before // accepting an off-switch assignment. return (Math.min(rmContext.getScheduler().getNumClusterNodes(), (requiredContainers * localityWaitFactor)) < missedOpportunities); @@ -258,7 +262,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // Check if we need containers on this rack ResourceRequest rackLocalRequest = - application.getResourceRequest(priority, node.getRackName()); + application.getResourceRequest(schedulerKey, node.getRackName()); if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) { return false; } @@ -266,7 +270,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // If we are here, we do need containers on this rack for RACK_LOCAL req if (type == NodeType.RACK_LOCAL) { // 'Delay' rack-local just a little bit... - long missedOpportunities = application.getSchedulingOpportunities(priority); + long missedOpportunities = + application.getSchedulingOpportunities(schedulerKey); return getActualNodeLocalityDelay() < missedOpportunities; } @@ -274,7 +279,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { if (type == NodeType.NODE_LOCAL) { // Now check if we need containers on this host... ResourceRequest nodeLocalRequest = - application.getResourceRequest(priority, node.getNodeName()); + application.getResourceRequest(schedulerKey, node.getNodeName()); if (nodeLocalRequest != null) { return nodeLocalRequest.getNumContainers() > 0; } @@ -285,10 +290,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { private ContainerAllocation assignNodeLocalContainers( Resource clusterResource, ResourceRequest nodeLocalResourceRequest, - FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, - SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { - if (canAssign(priority, node, NodeType.NODE_LOCAL, reservedContainer)) { - return assignContainer(clusterResource, node, priority, + FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, + RMContainer reservedContainer, SchedulingMode schedulingMode, + ResourceLimits currentResoureLimits) { + if (canAssign(schedulerKey, node, NodeType.NODE_LOCAL, reservedContainer)) { + return assignContainer(clusterResource, node, schedulerKey, nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, schedulingMode, currentResoureLimits); } @@ -299,10 +305,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { private ContainerAllocation assignRackLocalContainers( Resource clusterResource, ResourceRequest rackLocalResourceRequest, - FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, - SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { - if (canAssign(priority, node, NodeType.RACK_LOCAL, reservedContainer)) { - return assignContainer(clusterResource, node, priority, + FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, + RMContainer reservedContainer, SchedulingMode schedulingMode, + ResourceLimits currentResoureLimits) { + if (canAssign(schedulerKey, node, NodeType.RACK_LOCAL, reservedContainer)) { + return assignContainer(clusterResource, node, schedulerKey, rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, schedulingMode, currentResoureLimits); } @@ -313,10 +320,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { private ContainerAllocation assignOffSwitchContainers( Resource clusterResource, ResourceRequest offSwitchResourceRequest, - FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, - SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { - if (canAssign(priority, node, NodeType.OFF_SWITCH, reservedContainer)) { - return assignContainer(clusterResource, node, priority, + FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, + RMContainer reservedContainer, SchedulingMode schedulingMode, + ResourceLimits currentResoureLimits) { + if (canAssign(schedulerKey, node, NodeType.OFF_SWITCH, reservedContainer)) { + return assignContainer(clusterResource, node, schedulerKey, offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, schedulingMode, currentResoureLimits); } @@ -327,20 +335,21 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } private ContainerAllocation assignContainersOnNode(Resource clusterResource, - FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, - SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, + RMContainer reservedContainer, SchedulingMode schedulingMode, + ResourceLimits currentResoureLimits) { ContainerAllocation allocation; NodeType requestType = null; // Data-local ResourceRequest nodeLocalResourceRequest = - application.getResourceRequest(priority, node.getNodeName()); + application.getResourceRequest(schedulerKey, node.getNodeName()); if (nodeLocalResourceRequest != null) { requestType = NodeType.NODE_LOCAL; allocation = assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, - node, priority, reservedContainer, schedulingMode, + node, schedulerKey, reservedContainer, schedulingMode, currentResoureLimits); if (Resources.greaterThan(rc, clusterResource, allocation.getResourceToBeAllocated(), Resources.none())) { @@ -351,7 +360,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // Rack-local ResourceRequest rackLocalResourceRequest = - application.getResourceRequest(priority, node.getRackName()); + application.getResourceRequest(schedulerKey, node.getRackName()); if (rackLocalResourceRequest != null) { if (!rackLocalResourceRequest.getRelaxLocality()) { return ContainerAllocation.PRIORITY_SKIPPED; @@ -363,7 +372,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { allocation = assignRackLocalContainers(clusterResource, rackLocalResourceRequest, - node, priority, reservedContainer, schedulingMode, + node, schedulerKey, reservedContainer, schedulingMode, currentResoureLimits); if (Resources.greaterThan(rc, clusterResource, allocation.getResourceToBeAllocated(), Resources.none())) { @@ -374,7 +383,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // Off-switch ResourceRequest offSwitchResourceRequest = - application.getResourceRequest(priority, ResourceRequest.ANY); + application.getResourceRequest(schedulerKey, ResourceRequest.ANY); if (offSwitchResourceRequest != null) { if (!offSwitchResourceRequest.getRelaxLocality()) { return ContainerAllocation.PRIORITY_SKIPPED; @@ -386,7 +395,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { allocation = assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, - node, priority, reservedContainer, schedulingMode, + node, schedulerKey, reservedContainer, schedulingMode, currentResoureLimits); allocation.requestNodeType = requestType; @@ -403,21 +412,22 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } private ContainerAllocation assignContainer(Resource clusterResource, - FiCaSchedulerNode node, Priority priority, ResourceRequest request, - NodeType type, RMContainer rmContainer, SchedulingMode schedulingMode, - ResourceLimits currentResoureLimits) { + FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, + ResourceRequest request, NodeType type, RMContainer rmContainer, + SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { lastResourceRequest = request; if (LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() - + " application=" + application.getApplicationId() - + " priority=" + priority.getPriority() - + " request=" + request + " type=" + type); + + " application=" + application.getApplicationId() + + " priority=" + schedulerKey.getPriority() + + " request=" + request + " type=" + type); } // check if the resource request can access the label if (!SchedulerUtils.checkResourceRequestMatchingNodePartition( - request.getNodeLabelExpression(), node.getPartition(), schedulingMode)) { + request.getNodeLabelExpression(), node.getPartition(), + schedulingMode)) { // this is a reserved container, but we cannot allocate it now according // to label not match. This can be caused by node label changed // We should un-reserve this container. @@ -439,7 +449,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer( - priority, capability); + schedulerKey, capability); // Can we allocate a container on this node? long availableContainers = @@ -504,8 +514,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { resourceNeedToUnReserve = capability; } unreservedContainer = - application.findNodeToUnreserve(clusterResource, node, priority, - resourceNeedToUnReserve); + application.findNodeToUnreserve(clusterResource, node, + schedulerKey, resourceNeedToUnReserve); // When (minimum-unreserved-resource > 0 OR we cannot allocate // new/reserved // container (That means we *have to* unreserve some resource to @@ -553,28 +563,28 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } } - boolean - shouldAllocOrReserveNewContainer(Priority priority, Resource required) { - int requiredContainers = application.getTotalRequiredResources(priority); - int reservedContainers = application.getNumReservedContainers(priority); + boolean shouldAllocOrReserveNewContainer( + SchedulerRequestKey schedulerKey, Resource required) { + int requiredContainers = + application.getTotalRequiredResources(schedulerKey); + int reservedContainers = application.getNumReservedContainers(schedulerKey); int starvation = 0; if (reservedContainers > 0) { - float nodeFactor = - Resources - .ratio(rc, required, application.getCSLeafQueue().getMaximumAllocation()); + float nodeFactor = Resources.ratio( + rc, required, application.getCSLeafQueue().getMaximumAllocation()); // Use percentage of node required to bias against large containers... // Protect against corner case where you need the whole node with // Math.min(nodeFactor, minimumAllocationFactor) starvation = - (int) ((application.getReReservations(priority) / + (int) ((application.getReReservations(schedulerKey) / (float) reservedContainers) * (1.0f - (Math.min( nodeFactor, application.getCSLeafQueue() .getMinimumAllocationFactor())))); if (LOG.isDebugEnabled()) { LOG.debug("needsContainers:" + " app.#re-reserve=" - + application.getReReservations(priority) + " reserved=" + + application.getReReservations(schedulerKey) + " reserved=" + reservedContainers + " nodeFactor=" + nodeFactor + " minAllocFactor=" + application.getCSLeafQueue().getMinimumAllocationFactor() @@ -585,13 +595,14 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } private Container getContainer(RMContainer rmContainer, - FiCaSchedulerNode node, Resource capability, Priority priority) { + FiCaSchedulerNode node, Resource capability, + SchedulerRequestKey schedulerKey) { return (rmContainer != null) ? rmContainer.getContainer() - : createContainer(node, capability, priority); + : createContainer(node, capability, schedulerKey); } private Container createContainer(FiCaSchedulerNode node, Resource capability, - Priority priority) { + SchedulerRequestKey schedulerKey) { NodeId nodeId = node.getRMNode().getNodeID(); ContainerId containerId = @@ -600,22 +611,23 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // Create the container return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() - .getHttpAddress(), capability, priority, null); + .getHttpAddress(), capability, schedulerKey.getPriority(), null); } private ContainerAllocation handleNewContainerAllocation( ContainerAllocation allocationResult, FiCaSchedulerNode node, - Priority priority, RMContainer reservedContainer, Container container) { + SchedulerRequestKey schedulerKey, RMContainer reservedContainer, + Container container) { // Handling container allocation // Did we previously reserve containers at this 'priority'? if (reservedContainer != null) { - application.unreserve(priority, node, reservedContainer); + application.unreserve(schedulerKey, node, reservedContainer); } // Inform the application RMContainer allocatedContainer = application.allocate(allocationResult.containerNodeType, node, - priority, lastResourceRequest, container); + schedulerKey, lastResourceRequest, container); // Does the application need this resource? if (allocatedContainer == null) { @@ -637,12 +649,12 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } ContainerAllocation doAllocation(ContainerAllocation allocationResult, - FiCaSchedulerNode node, Priority priority, + FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, RMContainer reservedContainer) { // Create the container if necessary Container container = getContainer(reservedContainer, node, - allocationResult.getResourceToBeAllocated(), priority); + allocationResult.getResourceToBeAllocated(), schedulerKey); // something went wrong getting/creating the container if (container == null) { @@ -655,11 +667,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) { // When allocating container allocationResult = - handleNewContainerAllocation(allocationResult, node, priority, + handleNewContainerAllocation(allocationResult, node, schedulerKey, reservedContainer, container); } else { // When reserving container - application.reserve(priority, node, reservedContainer, container); + application.reserve(schedulerKey, node, reservedContainer, container); } allocationResult.updatedContainer = container; @@ -678,14 +690,15 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // RACK_LOCAL without delay. if (allocationResult.containerNodeType == NodeType.NODE_LOCAL || application.getCSLeafQueue().getRackLocalityFullReset()) { - application.resetSchedulingOpportunities(priority); + application.resetSchedulingOpportunities(schedulerKey); } } // Non-exclusive scheduling opportunity is different: we need reset // it every time to make sure non-labeled resource request will be // most likely allocated on non-labeled nodes first. - application.resetMissedNonPartitionedRequestSchedulingOpportunity(priority); + application.resetMissedNonPartitionedRequestSchedulingOpportunity( + schedulerKey); } return allocationResult; @@ -693,15 +706,15 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { private ContainerAllocation allocate(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, Priority priority, + ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey, RMContainer reservedContainer) { ContainerAllocation result = preAllocation(clusterResource, node, schedulingMode, resourceLimits, - priority, reservedContainer); + schedulerKey, reservedContainer); if (AllocationState.ALLOCATED == result.state || AllocationState.RESERVED == result.state) { - result = doAllocation(result, node, priority, reservedContainer); + result = doAllocation(result, node, schedulerKey, reservedContainer); } return result; @@ -725,10 +738,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } // Schedule in priority order - for (Priority priority : application.getPriorities()) { + for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) { ContainerAllocation result = allocate(clusterResource, node, schedulingMode, resourceLimits, - priority, null); + schedulerKey, null); AllocationState allocationState = result.getAllocationState(); if (allocationState == AllocationState.PRIORITY_SKIPPED) { @@ -744,7 +757,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } else { ContainerAllocation result = allocate(clusterResource, node, schedulingMode, resourceLimits, - reservedContainer.getReservedPriority(), reservedContainer); + reservedContainer.getReservedSchedulerKey(), reservedContainer); return getCSAssignmentFromAllocateResult(clusterResource, result, reservedContainer); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 8009580..67d93a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; @@ -181,7 +182,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { } public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node, - Priority priority, ResourceRequest request, + SchedulerRequestKey schedulerKey, ResourceRequest request, Container container) { if (isStopped) { @@ -190,10 +191,10 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { // Required sanity check - AM can call 'allocate' to update resource // request without locking the scheduler, hence we need to check - if (getTotalRequiredResources(priority) <= 0) { + if (getTotalRequiredResources(schedulerKey) <= 0) { return null; } - + // Create RMContainer RMContainer rmContainer = new RMContainerImpl(container, this.getApplicationAttemptId(), @@ -211,7 +212,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { // Update consumption and track allocations List resourceRequestList = appSchedulingInfo.allocate( - type, node, priority, request, container); + type, node, schedulerKey, request, container); attemptResourceUsage.incUsed(node.getPartition(), container.getResource()); @@ -235,13 +236,13 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return rmContainer; } - public synchronized boolean unreserve(Priority priority, + public synchronized boolean unreserve(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, RMContainer rmContainer) { // Cancel increase request (if it has reserved increase request rmContainer.cancelIncreaseReservation(); // Done with the reservation? - if (internalUnreserve(node, priority)) { + if (internalUnreserve(node, schedulerKey)) { node.unreserveResource(this); // Update reserved metrics @@ -254,12 +255,14 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return false; } - private boolean internalUnreserve(FiCaSchedulerNode node, Priority priority) { + private boolean internalUnreserve(FiCaSchedulerNode node, + SchedulerRequestKey schedulerKey) { Map reservedContainers = - this.reservedContainers.get(priority); + this.reservedContainers.get(schedulerKey); if (reservedContainers != null) { - RMContainer reservedContainer = reservedContainers.remove(node.getNodeID()); + RMContainer reservedContainer = + reservedContainers.remove(node.getNodeID()); // unreserve is now triggered in new scenarios (preemption) // as a consequence reservedcontainer might be null, adding NP-checks @@ -268,17 +271,18 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { && reservedContainer.getContainer().getResource() != null) { if (reservedContainers.isEmpty()) { - this.reservedContainers.remove(priority); + this.reservedContainers.remove(schedulerKey); } // Reset the re-reservation count - resetReReservations(priority); + resetReReservations(schedulerKey); Resource resource = reservedContainer.getReservedResource(); this.attemptResourceUsage.decReserved(node.getPartition(), resource); LOG.info("Application " + getApplicationId() + " unreserved " + " on node " + node + ", currently has " - + reservedContainers.size() + " at priority " + priority + + reservedContainers.size() + + " at priority " + schedulerKey.getPriority() + "; currentReservation " + this.attemptResourceUsage.getReserved() + " on node-label=" + node.getPartition()); return true; @@ -288,10 +292,10 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { } public synchronized float getLocalityWaitFactor( - Priority priority, int clusterNodes) { + SchedulerRequestKey schedulerKey, int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) int requiredResources = - Math.max(this.getResourceRequests(priority).size() - 1, 0); + Math.max(this.getResourceRequests(schedulerKey).size() - 1, 0); // waitFactor can't be more than '1' // i.e. no point skipping more than clustersize opportunities @@ -354,14 +358,14 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { newlyIncreasedContainers, newlyDecreasedContainers); } - synchronized public NodeId getNodeIdToUnreserve(Priority priority, - Resource resourceNeedUnreserve, ResourceCalculator rc, - Resource clusterResource) { + synchronized public NodeId getNodeIdToUnreserve( + SchedulerRequestKey schedulerKey, Resource resourceNeedUnreserve, + ResourceCalculator rc, Resource clusterResource) { // first go around make this algorithm simple and just grab first // reservation that has enough resources Map reservedContainers = this.reservedContainers - .get(priority); + .get(schedulerKey); if ((reservedContainers != null) && (!reservedContainers.isEmpty())) { for (Map.Entry entry : reservedContainers.entrySet()) { @@ -417,17 +421,17 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { ((FiCaSchedulerApp) appAttempt).getHeadroomProvider(); } - public boolean reserveIncreasedContainer(Priority priority, + public boolean reserveIncreasedContainer(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, RMContainer rmContainer, Resource reservedResource) { // Inform the application - if (super.reserveIncreasedContainer(node, priority, rmContainer, + if (super.reserveIncreasedContainer(node, schedulerKey, rmContainer, reservedResource)) { queue.getMetrics().reserveResource(getUser(), reservedResource); // Update the node - node.reserveResource(this, priority, rmContainer); + node.reserveResource(this, schedulerKey, rmContainer); // Succeeded return true; @@ -436,7 +440,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return false; } - public void reserve(Priority priority, + public void reserve(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, RMContainer rmContainer, Container container) { // Update reserved metrics if this is the first reservation if (rmContainer == null) { @@ -445,19 +449,19 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { } // Inform the application - rmContainer = super.reserve(node, priority, rmContainer, container); + rmContainer = super.reserve(node, schedulerKey, rmContainer, container); // Update the node - node.reserveResource(this, priority, rmContainer); + node.reserveResource(this, schedulerKey, rmContainer); } @VisibleForTesting public RMContainer findNodeToUnreserve(Resource clusterResource, - FiCaSchedulerNode node, Priority priority, + FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, Resource minimumUnreservedResource) { // need to unreserve some other container first NodeId idToUnreserve = - getNodeIdToUnreserve(priority, minimumUnreservedResource, + getNodeIdToUnreserve(schedulerKey, minimumUnreservedResource, rc, clusterResource); if (idToUnreserve == null) { if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index f90a53c..d79fcaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -23,12 +23,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; @@ -54,7 +55,7 @@ public class FiCaSchedulerNode extends SchedulerNode { @Override public synchronized void reserveResource( - SchedulerApplicationAttempt application, Priority priority, + SchedulerApplicationAttempt application, SchedulerRequestKey priority, RMContainer container) { // Check if it's already reserved RMContainer reservedContainer = getReservedContainer(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 1eead9a..8f074cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -97,8 +98,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt * at the current allowed level and the time since the last container * was scheduled. Currently we use only the former. */ - private final Map allowedLocalityLevel = - new HashMap(); + private final Map allowedLocalityLevel = + new HashMap<>(); public FSAppAttempt(FairScheduler scheduler, ApplicationAttemptId applicationAttemptId, String user, FSLeafQueue queue, @@ -163,23 +164,23 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } private synchronized void unreserveInternal( - Priority priority, FSSchedulerNode node) { + SchedulerRequestKey schedulerKey, FSSchedulerNode node) { Map reservedContainers = - this.reservedContainers.get(priority); + this.reservedContainers.get(schedulerKey); RMContainer reservedContainer = reservedContainers.remove(node.getNodeID()); if (reservedContainers.isEmpty()) { - this.reservedContainers.remove(priority); + this.reservedContainers.remove(schedulerKey); } // Reset the re-reservation count - resetReReservations(priority); + resetReReservations(schedulerKey); Resource resource = reservedContainer.getContainer().getResource(); this.attemptResourceUsage.decReserved(resource); LOG.info("Application " + getApplicationId() + " unreserved " + " on node " + node + ", currently has " + reservedContainers.size() - + " at priority " + priority + "; currentReservation " + + " at priority " + schedulerKey.getPriority() + "; currentReservation " + this.attemptResourceUsage.getReserved()); } @@ -239,10 +240,10 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } public synchronized float getLocalityWaitFactor( - Priority priority, int clusterNodes) { + SchedulerRequestKey schedulerKey, int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) int requiredResources = - Math.max(this.getResourceRequests(priority).size() - 1, 0); + Math.max(this.getResourceRequests(schedulerKey).size() - 1, 0); // waitFactor can't be more than '1' // i.e. no point skipping more than clustersize opportunities @@ -254,9 +255,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt * current size of the cluster and thresholds indicating how many nodes to * fail at (as a fraction of cluster size) before relaxing scheduling * constraints. + * @param schedulerKey SchedulerRequestKey + * @param numNodes Num Nodes + * @param nodeLocalityThreshold nodeLocalityThreshold + * @param rackLocalityThreshold rackLocalityThreshold + * @return NodeType */ - public synchronized NodeType getAllowedLocalityLevel(Priority priority, - int numNodes, double nodeLocalityThreshold, double rackLocalityThreshold) { + public synchronized NodeType getAllowedLocalityLevel( + SchedulerRequestKey schedulerKey, int numNodes, + double nodeLocalityThreshold, double rackLocalityThreshold) { // upper limit on threshold if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; } if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; } @@ -267,12 +274,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } // Default level is NODE_LOCAL - if (!allowedLocalityLevel.containsKey(priority)) { - allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL); + if (!allowedLocalityLevel.containsKey(schedulerKey)) { + allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL); return NodeType.NODE_LOCAL; } - NodeType allowed = allowedLocalityLevel.get(priority); + NodeType allowed = allowedLocalityLevel.get(schedulerKey); // If level is already most liberal, we're done if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH; @@ -281,27 +288,32 @@ public class FSAppAttempt extends SchedulerApplicationAttempt rackLocalityThreshold; // Relax locality constraints once we've surpassed threshold. - if (getSchedulingOpportunities(priority) > (numNodes * threshold)) { + if (getSchedulingOpportunities(schedulerKey) > (numNodes * threshold)) { if (allowed.equals(NodeType.NODE_LOCAL)) { - allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL); - resetSchedulingOpportunities(priority); + allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL); + resetSchedulingOpportunities(schedulerKey); } else if (allowed.equals(NodeType.RACK_LOCAL)) { - allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH); - resetSchedulingOpportunities(priority); + allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH); + resetSchedulingOpportunities(schedulerKey); } } - return allowedLocalityLevel.get(priority); + return allowedLocalityLevel.get(schedulerKey); } /** * Return the level at which we are allowed to schedule containers. * Given the thresholds indicating how much time passed before relaxing * scheduling constraints. + * @param schedulerKey SchedulerRequestKey + * @param nodeLocalityDelayMs nodeLocalityThreshold + * @param rackLocalityDelayMs nodeLocalityDelayMs + * @param currentTimeMs currentTimeMs + * @return NodeType */ - public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority, - long nodeLocalityDelayMs, long rackLocalityDelayMs, - long currentTimeMs) { + public synchronized NodeType getAllowedLocalityLevelByTime( + SchedulerRequestKey schedulerKey, long nodeLocalityDelayMs, + long rackLocalityDelayMs, long currentTimeMs) { // if not being used, can schedule anywhere if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) { @@ -309,19 +321,19 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } // default level is NODE_LOCAL - if (!allowedLocalityLevel.containsKey(priority)) { + if (!allowedLocalityLevel.containsKey(schedulerKey)) { // add the initial time of priority to prevent comparing with FsApp // startTime and allowedLocalityLevel degrade - lastScheduledContainer.put(priority, currentTimeMs); + lastScheduledContainer.put(schedulerKey, currentTimeMs); if (LOG.isDebugEnabled()) { - LOG.debug("Init the lastScheduledContainer time, priority: " + priority - + ", time: " + currentTimeMs); + LOG.debug("Init the lastScheduledContainer time, priority: " + + schedulerKey.getPriority() + ", time: " + currentTimeMs); } - allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL); + allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL); return NodeType.NODE_LOCAL; } - NodeType allowed = allowedLocalityLevel.get(priority); + NodeType allowed = allowedLocalityLevel.get(schedulerKey); // if level is already most liberal, we're done if (allowed.equals(NodeType.OFF_SWITCH)) { @@ -330,8 +342,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // check waiting time long waitTime = currentTimeMs; - if (lastScheduledContainer.containsKey(priority)) { - waitTime -= lastScheduledContainer.get(priority); + if (lastScheduledContainer.containsKey(schedulerKey)) { + waitTime -= lastScheduledContainer.get(schedulerKey); } else { waitTime -= getStartTime(); } @@ -341,43 +353,43 @@ public class FSAppAttempt extends SchedulerApplicationAttempt if (waitTime > thresholdTime) { if (allowed.equals(NodeType.NODE_LOCAL)) { - allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL); - resetSchedulingOpportunities(priority, currentTimeMs); + allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL); + resetSchedulingOpportunities(schedulerKey, currentTimeMs); } else if (allowed.equals(NodeType.RACK_LOCAL)) { - allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH); - resetSchedulingOpportunities(priority, currentTimeMs); + allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH); + resetSchedulingOpportunities(schedulerKey, currentTimeMs); } } - return allowedLocalityLevel.get(priority); + return allowedLocalityLevel.get(schedulerKey); } synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, - Priority priority, ResourceRequest request, + SchedulerRequestKey schedulerKey, ResourceRequest request, Container reservedContainer) { // Update allowed locality level - NodeType allowed = allowedLocalityLevel.get(priority); + NodeType allowed = allowedLocalityLevel.get(schedulerKey); if (allowed != null) { if (allowed.equals(NodeType.OFF_SWITCH) && (type.equals(NodeType.NODE_LOCAL) || type.equals(NodeType.RACK_LOCAL))) { - this.resetAllowedLocalityLevel(priority, type); + this.resetAllowedLocalityLevel(schedulerKey, type); } else if (allowed.equals(NodeType.RACK_LOCAL) && type.equals(NodeType.NODE_LOCAL)) { - this.resetAllowedLocalityLevel(priority, type); + this.resetAllowedLocalityLevel(schedulerKey, type); } } // Required sanity check - AM can call 'allocate' to update resource // request without locking the scheduler, hence we need to check - if (getTotalRequiredResources(priority) <= 0) { + if (getTotalRequiredResources(schedulerKey) <= 0) { return null; } Container container = reservedContainer; if (container == null) { container = - createContainer(node, request.getCapability(), request.getPriority()); + createContainer(node, request.getCapability(), schedulerKey); } // Create RMContainer @@ -392,7 +404,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // Update consumption and track allocations List resourceRequestList = appSchedulingInfo.allocate( - type, node, priority, request, container); + type, node, schedulerKey, request, container); this.attemptResourceUsage.incUsed(container.getResource()); // Update resource requests related to "request" and store in RMContainer @@ -419,13 +431,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt * Should be called when the scheduler assigns a container at a higher * degree of locality than the current threshold. Reset the allowed locality * level to a higher degree of locality. + * @param schedulerKey Scheduler Key + * @param level NodeType */ - public synchronized void resetAllowedLocalityLevel(Priority priority, - NodeType level) { - NodeType old = allowedLocalityLevel.get(priority); + public synchronized void resetAllowedLocalityLevel( + SchedulerRequestKey schedulerKey, NodeType level) { + NodeType old = allowedLocalityLevel.get(schedulerKey); LOG.info("Raising locality level from " + old + " to " + level + " at " + - " priority " + priority); - allowedLocalityLevel.put(priority, level); + " priority " + schedulerKey.getPriority()); + allowedLocalityLevel.put(schedulerKey, level); } // related methods @@ -468,9 +482,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt * Create and return a container object reflecting an allocation for the * given appliction on the given node with the given capability and * priority. + * @param node Node + * @param capability Capability + * @param schedulerKey Scheduler Key + * @return Container */ - public Container createContainer( - FSSchedulerNode node, Resource capability, Priority priority) { + public Container createContainer(FSSchedulerNode node, Resource capability, + SchedulerRequestKey schedulerKey) { NodeId nodeId = node.getRMNode().getNodeID(); ContainerId containerId = BuilderUtils.newContainerId( @@ -479,7 +497,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // Create the container Container container = BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() - .getHttpAddress(), capability, priority, null); + .getHttpAddress(), capability, schedulerKey.getPriority(), null); return container; } @@ -492,26 +510,26 @@ public class FSAppAttempt extends SchedulerApplicationAttempt * return whether reservation was possible with the current threshold limits */ private boolean reserve(ResourceRequest request, FSSchedulerNode node, - Container reservedContainer, NodeType type) { + Container reservedContainer, NodeType type, + SchedulerRequestKey schedulerKey) { - Priority priority = request.getPriority(); if (!reservationExceedsThreshold(node, type)) { LOG.info("Making reservation: node=" + node.getNodeName() + " app_id=" + getApplicationId()); if (reservedContainer == null) { reservedContainer = createContainer(node, request.getCapability(), - request.getPriority()); + schedulerKey); getMetrics().reserveResource(getUser(), reservedContainer.getResource()); RMContainer rmContainer = - super.reserve(node, priority, null, reservedContainer); - node.reserveResource(this, priority, rmContainer); + super.reserve(node, schedulerKey, null, reservedContainer); + node.reserveResource(this, schedulerKey, rmContainer); setReservation(node); } else { RMContainer rmContainer = node.getReservedContainer(); - super.reserve(node, priority, rmContainer, reservedContainer); - node.reserveResource(this, priority, rmContainer); + super.reserve(node, schedulerKey, rmContainer, reservedContainer); + node.reserveResource(this, schedulerKey, rmContainer); setReservation(node); } return true; @@ -548,13 +566,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } return false; } + /** - * Remove the reservation on {@code node} at the given {@link Priority}. + * Remove the reservation on {@code node} at the given SchedulerRequestKey. * This dispatches SchedulerNode handlers as well. + * @param schedulerKey Scheduler Key + * @param node Node */ - public void unreserve(Priority priority, FSSchedulerNode node) { + public void unreserve(SchedulerRequestKey schedulerKey, + FSSchedulerNode node) { RMContainer rmContainer = node.getReservedContainer(); - unreserveInternal(priority, node); + unreserveInternal(schedulerKey, node); node.unreserveResource(this); clearReservation(node); getMetrics().unreserveResource( @@ -618,7 +640,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt */ private Resource assignContainer( FSSchedulerNode node, ResourceRequest request, NodeType type, - boolean reserved) { + boolean reserved, SchedulerRequestKey schedulerKey) { // How much does this request need? Resource capability = request.getCapability(); @@ -635,19 +657,19 @@ public class FSAppAttempt extends SchedulerApplicationAttempt if (Resources.fitsIn(capability, available)) { // Inform the application of the new container for this request RMContainer allocatedContainer = - allocate(type, node, request.getPriority(), request, + allocate(type, node, schedulerKey, request, reservedContainer); if (allocatedContainer == null) { // Did the application need this resource? if (reserved) { - unreserve(request.getPriority(), node); + unreserve(schedulerKey, node); } return Resources.none(); } // If we had previously made a reservation, delete it if (reserved) { - unreserve(request.getPriority(), node); + unreserve(schedulerKey, node); } // Inform the node @@ -667,7 +689,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // The desired container won't fit here, so reserve if (isReservable(capability) && - reserve(request, node, reservedContainer, type)) { + reserve(request, node, reservedContainer, type, schedulerKey)) { return FairScheduler.CONTAINER_RESERVED; } else { if (LOG.isDebugEnabled()) { @@ -683,8 +705,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt getQueue().getPolicy().getResourceCalculator(), capacity); } - private boolean hasNodeOrRackLocalRequests(Priority priority) { - return getResourceRequests(priority).size() > 1; + private boolean hasNodeOrRackLocalRequests(SchedulerRequestKey schedulerKey) { + return getResourceRequests(schedulerKey).size() > 1; } /** @@ -707,26 +729,26 @@ public class FSAppAttempt extends SchedulerApplicationAttempt LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved); } - Collection prioritiesToTry = (reserved) ? - Arrays.asList(node.getReservedContainer().getReservedPriority()) : - getPriorities(); + Collection keysToTry = (reserved) ? + Arrays.asList(node.getReservedContainer().getReservedSchedulerKey()) : + getSchedulerKeys(); // For each priority, see if we can schedule a node local, rack local // or off-switch request. Rack of off-switch requests may be delayed // (not scheduled) in order to promote better locality. synchronized (this) { - for (Priority priority : prioritiesToTry) { + for (SchedulerRequestKey schedulerKey : keysToTry) { // Skip it for reserved container, since // we already check it in isValidReservation. - if (!reserved && !hasContainerForNode(priority, node)) { + if (!reserved && !hasContainerForNode(schedulerKey, node)) { continue; } - addSchedulingOpportunity(priority); + addSchedulingOpportunity(schedulerKey); - ResourceRequest rackLocalRequest = getResourceRequest(priority, + ResourceRequest rackLocalRequest = getResourceRequest(schedulerKey, node.getRackName()); - ResourceRequest localRequest = getResourceRequest(priority, + ResourceRequest localRequest = getResourceRequest(schedulerKey, node.getNodeName()); if (localRequest != null && !localRequest.getRelaxLocality()) { @@ -736,12 +758,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt NodeType allowedLocality; if (scheduler.isContinuousSchedulingEnabled()) { - allowedLocality = getAllowedLocalityLevelByTime(priority, + allowedLocality = getAllowedLocalityLevelByTime(schedulerKey, scheduler.getNodeLocalityDelayMs(), scheduler.getRackLocalityDelayMs(), scheduler.getClock().getTime()); } else { - allowedLocality = getAllowedLocalityLevel(priority, + allowedLocality = getAllowedLocalityLevel(schedulerKey, scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(), scheduler.getRackLocalityThreshold()); @@ -750,7 +772,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 && localRequest != null && localRequest.getNumContainers() != 0) { return assignContainer(node, localRequest, - NodeType.NODE_LOCAL, reserved); + NodeType.NODE_LOCAL, reserved, schedulerKey); } if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) { @@ -761,21 +783,22 @@ public class FSAppAttempt extends SchedulerApplicationAttempt && (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality.equals(NodeType.OFF_SWITCH))) { return assignContainer(node, rackLocalRequest, - NodeType.RACK_LOCAL, reserved); + NodeType.RACK_LOCAL, reserved, schedulerKey); } ResourceRequest offSwitchRequest = - getResourceRequest(priority, ResourceRequest.ANY); + getResourceRequest(schedulerKey, ResourceRequest.ANY); if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) { continue; } if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0) { - if (!hasNodeOrRackLocalRequests(priority) || + if (!hasNodeOrRackLocalRequests(schedulerKey) || allowedLocality.equals(NodeType.OFF_SWITCH)) { return assignContainer( - node, offSwitchRequest, NodeType.OFF_SWITCH, reserved); + node, offSwitchRequest, NodeType.OFF_SWITCH, reserved, + schedulerKey); } } } @@ -787,10 +810,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt * Whether this app has containers requests that could be satisfied on the * given node, if the node had full space. */ - private boolean hasContainerForNode(Priority prio, FSSchedulerNode node) { - ResourceRequest anyRequest = getResourceRequest(prio, ResourceRequest.ANY); - ResourceRequest rackRequest = getResourceRequest(prio, node.getRackName()); - ResourceRequest nodeRequest = getResourceRequest(prio, node.getNodeName()); + private boolean hasContainerForNode(SchedulerRequestKey key, + FSSchedulerNode node) { + ResourceRequest anyRequest = getResourceRequest(key, ResourceRequest.ANY); + ResourceRequest rackRequest = getResourceRequest(key, node.getRackName()); + ResourceRequest nodeRequest = getResourceRequest(key, node.getNodeName()); return // There must be outstanding requests at the given priority: @@ -812,9 +836,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } private boolean isValidReservation(FSSchedulerNode node) { - Priority reservedPriority = node.getReservedContainer(). - getReservedPriority(); - return hasContainerForNode(reservedPriority, node) && + SchedulerRequestKey schedulerKey = node.getReservedContainer(). + getReservedSchedulerKey(); + return hasContainerForNode(schedulerKey, node) && !isOverAMShareLimit(); } @@ -830,13 +854,14 @@ public class FSAppAttempt extends SchedulerApplicationAttempt */ public boolean assignReservedContainer(FSSchedulerNode node) { RMContainer rmContainer = node.getReservedContainer(); - Priority reservedPriority = rmContainer.getReservedPriority(); + SchedulerRequestKey reservedSchedulerKey = + rmContainer.getReservedSchedulerKey(); if (!isValidReservation(node)) { // Don't hold the reservation if app can no longer use it LOG.info("Releasing reservation that cannot be satisfied for " + "application " + getApplicationAttemptId() + " on node " + node); - unreserve(reservedPriority, node); + unreserve(reservedSchedulerKey, node); return false; } @@ -938,8 +963,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // Add up outstanding resource requests synchronized (this) { - for (Priority p : getPriorities()) { - ResourceRequest r = getResourceRequest(p, ResourceRequest.ANY); + for (SchedulerRequestKey k : getSchedulerKeys()) { + ResourceRequest r = getResourceRequest(k, ResourceRequest.ANY); if (r != null) { Resources.multiplyAndAddTo(demand, r.getCapability(), r.getNumContainers()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index c86201a..024ec67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -23,10 +23,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; @Private @@ -43,7 +43,7 @@ public class FSSchedulerNode extends SchedulerNode { @Override public synchronized void reserveResource( - SchedulerApplicationAttempt application, Priority priority, + SchedulerApplicationAttempt application, SchedulerRequestKey schedulerKey, RMContainer container) { // Check if it's already reserved RMContainer reservedContainer = getReservedContainer(); @@ -102,4 +102,5 @@ public class FSSchedulerNode extends SchedulerNode { public synchronized FSAppAttempt getReservedAppSchedulable() { return reservedAppSchedulable; } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index bc953ba..ac384a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -866,7 +866,7 @@ public class FairScheduler extends FSSchedulerNode node = getFSSchedulerNode(container.getNodeId()); if (rmContainer.getState() == RMContainerState.RESERVED) { - application.unreserve(rmContainer.getReservedPriority(), node); + application.unreserve(rmContainer.getReservedSchedulerKey(), node); } else { application.containerCompleted(rmContainer, containerStatus, event); node.releaseContainer(container); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index eaab495..fe8d0af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerCha import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -514,14 +515,15 @@ public class FifoScheduler extends continue; } - for (Priority priority : application.getPriorities()) { - int maxContainers = - getMaxAllocatableContainers(application, priority, node, - NodeType.OFF_SWITCH); + for (SchedulerRequestKey schedulerKey : + application.getSchedulerKeys()) { + int maxContainers = + getMaxAllocatableContainers(application, schedulerKey, node, + NodeType.OFF_SWITCH); // Ensure the application needs containers of this priority if (maxContainers > 0) { - int assignedContainers = - assignContainersOnNode(node, application, priority); + int assignedContainers = + assignContainersOnNode(node, application, schedulerKey); // Do not assign out of order w.r.t priorities if (assignedContainers == 0) { break; @@ -553,11 +555,11 @@ public class FifoScheduler extends } private int getMaxAllocatableContainers(FiCaSchedulerApp application, - Priority priority, FiCaSchedulerNode node, NodeType type) { + SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, NodeType type) { int maxContainers = 0; - - ResourceRequest offSwitchRequest = - application.getResourceRequest(priority, ResourceRequest.ANY); + + ResourceRequest offSwitchRequest = + application.getResourceRequest(schedulerKey, ResourceRequest.ANY); if (offSwitchRequest != null) { maxContainers = offSwitchRequest.getNumContainers(); } @@ -567,8 +569,9 @@ public class FifoScheduler extends } if (type == NodeType.RACK_LOCAL) { - ResourceRequest rackLocalRequest = - application.getResourceRequest(priority, node.getRMNode().getRackName()); + ResourceRequest rackLocalRequest = + application.getResourceRequest(schedulerKey, node.getRMNode() + .getRackName()); if (rackLocalRequest == null) { return maxContainers; } @@ -577,8 +580,9 @@ public class FifoScheduler extends } if (type == NodeType.NODE_LOCAL) { - ResourceRequest nodeLocalRequest = - application.getResourceRequest(priority, node.getRMNode().getNodeAddress()); + ResourceRequest nodeLocalRequest = + application.getResourceRequest(schedulerKey, node.getRMNode() + .getNodeAddress()); if (nodeLocalRequest != null) { maxContainers = Math.min(maxContainers, nodeLocalRequest.getNumContainers()); } @@ -589,25 +593,25 @@ public class FifoScheduler extends private int assignContainersOnNode(FiCaSchedulerNode node, - FiCaSchedulerApp application, Priority priority + FiCaSchedulerApp application, SchedulerRequestKey schedulerKey ) { // Data-local - int nodeLocalContainers = - assignNodeLocalContainers(node, application, priority); + int nodeLocalContainers = + assignNodeLocalContainers(node, application, schedulerKey); // Rack-local - int rackLocalContainers = - assignRackLocalContainers(node, application, priority); + int rackLocalContainers = + assignRackLocalContainers(node, application, schedulerKey); // Off-switch int offSwitchContainers = - assignOffSwitchContainers(node, application, priority); + assignOffSwitchContainers(node, application, schedulerKey); LOG.debug("assignContainersOnNode:" + " node=" + node.getRMNode().getNodeAddress() + " application=" + application.getApplicationId().getId() + - " priority=" + priority.getPriority() + + " priority=" + schedulerKey.getPriority() + " #assigned=" + (nodeLocalContainers + rackLocalContainers + offSwitchContainers)); @@ -616,14 +620,14 @@ public class FifoScheduler extends } private int assignNodeLocalContainers(FiCaSchedulerNode node, - FiCaSchedulerApp application, Priority priority) { + FiCaSchedulerApp application, SchedulerRequestKey schedulerKey) { int assignedContainers = 0; - ResourceRequest request = - application.getResourceRequest(priority, node.getNodeName()); + ResourceRequest request = + application.getResourceRequest(schedulerKey, node.getNodeName()); if (request != null) { // Don't allocate on this node if we don't need containers on this rack ResourceRequest rackRequest = - application.getResourceRequest(priority, + application.getResourceRequest(schedulerKey, node.getRMNode().getRackName()); if (rackRequest == null || rackRequest.getNumContainers() <= 0) { return 0; @@ -631,61 +635,62 @@ public class FifoScheduler extends int assignableContainers = Math.min( - getMaxAllocatableContainers(application, priority, node, + getMaxAllocatableContainers(application, schedulerKey, node, NodeType.NODE_LOCAL), request.getNumContainers()); assignedContainers = - assignContainer(node, application, priority, + assignContainer(node, application, schedulerKey, assignableContainers, request, NodeType.NODE_LOCAL); } return assignedContainers; } private int assignRackLocalContainers(FiCaSchedulerNode node, - FiCaSchedulerApp application, Priority priority) { + FiCaSchedulerApp application, SchedulerRequestKey schedulerKey) { int assignedContainers = 0; - ResourceRequest request = - application.getResourceRequest(priority, node.getRMNode().getRackName()); + ResourceRequest request = + application.getResourceRequest(schedulerKey, node.getRMNode() + .getRackName()); if (request != null) { // Don't allocate on this rack if the application doens't need containers ResourceRequest offSwitchRequest = - application.getResourceRequest(priority, ResourceRequest.ANY); + application.getResourceRequest(schedulerKey, ResourceRequest.ANY); if (offSwitchRequest.getNumContainers() <= 0) { return 0; } int assignableContainers = Math.min( - getMaxAllocatableContainers(application, priority, node, + getMaxAllocatableContainers(application, schedulerKey, node, NodeType.RACK_LOCAL), request.getNumContainers()); assignedContainers = - assignContainer(node, application, priority, + assignContainer(node, application, schedulerKey, assignableContainers, request, NodeType.RACK_LOCAL); } return assignedContainers; } private int assignOffSwitchContainers(FiCaSchedulerNode node, - FiCaSchedulerApp application, Priority priority) { + FiCaSchedulerApp application, SchedulerRequestKey schedulerKey) { int assignedContainers = 0; - ResourceRequest request = - application.getResourceRequest(priority, ResourceRequest.ANY); + ResourceRequest request = + application.getResourceRequest(schedulerKey, ResourceRequest.ANY); if (request != null) { assignedContainers = - assignContainer(node, application, priority, + assignContainer(node, application, schedulerKey, request.getNumContainers(), request, NodeType.OFF_SWITCH); } return assignedContainers; } private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application, - Priority priority, int assignableContainers, + SchedulerRequestKey schedulerKey, int assignableContainers, ResourceRequest request, NodeType type) { LOG.debug("assignContainers:" + " node=" + node.getRMNode().getNodeAddress() + " application=" + application.getApplicationId().getId() + - " priority=" + priority.getPriority() + + " priority=" + schedulerKey.getPriority().getPriority() + " assignableContainers=" + assignableContainers + " request=" + request + " type=" + type); Resource capability = request.getCapability(); @@ -707,13 +712,13 @@ public class FifoScheduler extends // Create the container Container container = BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() - .getHttpAddress(), capability, priority, null); + .getHttpAddress(), capability, schedulerKey.getPriority(), null); // Allocate! // Inform the application RMContainer rmContainer = - application.allocate(type, node, priority, request, container); + application.allocate(type, node, schedulerKey, request, container); // Inform the node node.allocateContainer(rmContainer); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org