Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A187F18904 for ; Mon, 27 Jul 2015 20:08:05 +0000 (UTC) Received: (qmail 83909 invoked by uid 500); 27 Jul 2015 20:07:46 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 82661 invoked by uid 500); 27 Jul 2015 20:07:45 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 80532 invoked by uid 99); 27 Jul 2015 20:07:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Jul 2015 20:07:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EF46DE095F; Mon, 27 Jul 2015 20:07:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zjshen@apache.org To: common-commits@hadoop.apache.org Date: Mon, 27 Jul 2015 20:08:18 -0000 Message-Id: <00e6732e06a54ca99cfac974aa7284dd@git.apache.org> In-Reply-To: <0e7baf2391424ef092872afc9ad1fa83@git.apache.org> References: <0e7baf2391424ef092872afc9ad1fa83@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [36/50] [abbrv] hadoop git commit: YARN-3026. Move application-specific container allocation logic from LeafQueue to FiCaSchedulerApp. Contributed by Wangda Tan YARN-3026. Move application-specific container allocation logic from LeafQueue to FiCaSchedulerApp. Contributed by Wangda Tan Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d725cf9d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d725cf9d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d725cf9d Branch: refs/heads/YARN-2928 Commit: d725cf9da57255cf8e517a035e633d3f3c4abc80 Parents: 0c2ae54 Author: Jian He Authored: Fri Jul 24 14:00:25 2015 -0700 Committer: Zhijie Shen Committed: Mon Jul 27 12:57:36 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../server/resourcemanager/RMContextImpl.java | 3 +- .../scheduler/ResourceLimits.java | 19 +- .../scheduler/capacity/AbstractCSQueue.java | 27 +- .../scheduler/capacity/CSAssignment.java | 12 +- .../capacity/CapacityHeadroomProvider.java | 16 +- .../scheduler/capacity/CapacityScheduler.java | 14 - .../scheduler/capacity/LeafQueue.java | 833 +++---------------- .../scheduler/capacity/ParentQueue.java | 16 +- .../scheduler/common/fica/FiCaSchedulerApp.java | 721 +++++++++++++++- .../capacity/TestApplicationLimits.java | 15 +- .../capacity/TestCapacityScheduler.java | 3 +- .../capacity/TestContainerAllocation.java | 85 +- .../scheduler/capacity/TestLeafQueue.java | 191 +---- .../scheduler/capacity/TestReservations.java | 111 +-- .../scheduler/capacity/TestUtils.java | 25 +- 16 files changed, 1048 insertions(+), 1046 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d725cf9d/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index fb21bf9..cd033f9 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -460,6 +460,9 @@ Release 2.8.0 - UNRELEASED YARN-3844. Make hadoop-yarn-project Native code -Wall-clean (Alan Burlison via Colin P. McCabe) + YARN-3026. Move application-specific container allocation logic from + LeafQueue to FiCaSchedulerApp. (Wangda Tan via jianhe) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not http://git-wip-us.apache.org/repos/asf/hadoop/blob/d725cf9d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 3b33f23..772a246 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -293,7 +293,8 @@ public class RMContextImpl implements RMContext { activeServiceContext.setNMTokenSecretManager(nmTokenSecretManager); } - void setScheduler(ResourceScheduler scheduler) { + @VisibleForTesting + public void setScheduler(ResourceScheduler scheduler) { activeServiceContext.setScheduler(scheduler); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d725cf9d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java index 8074794..c545e9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java @@ -26,20 +26,25 @@ import org.apache.hadoop.yarn.util.resource.Resources; * that, it's not "extra") resource you can get. */ public class ResourceLimits { - volatile Resource limit; + private volatile Resource limit; // This is special limit that goes with the RESERVE_CONT_LOOK_ALL_NODES // config. This limit indicates how much we need to unreserve to allocate // another container. private volatile Resource amountNeededUnreserve; + // How much resource you can use for next allocation, if this isn't enough for + // next container allocation, you may need to consider unreserve some + // containers. + private volatile Resource headroom; + public ResourceLimits(Resource limit) { - this.amountNeededUnreserve = Resources.none(); - this.limit = limit; + this(limit, Resources.none()); } public ResourceLimits(Resource limit, Resource amountNeededUnreserve) { this.amountNeededUnreserve = amountNeededUnreserve; + this.headroom = limit; this.limit = limit; } @@ -47,6 +52,14 @@ public class ResourceLimits { return limit; } + public Resource getHeadroom() { + return headroom; + } + + public void setHeadroom(Resource headroom) { + this.headroom = headroom; + } + public Resource getAmountNeededUnreserve() { return amountNeededUnreserve; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d725cf9d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 7f8e164..dcc4205 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -65,7 +65,7 @@ public abstract class AbstractCSQueue implements CSQueue { volatile int numContainers; final Resource minimumAllocation; - Resource maximumAllocation; + volatile Resource maximumAllocation; QueueState state; final CSQueueMetrics metrics; protected final PrivilegedEntity queueEntity; @@ -77,7 +77,7 @@ public abstract class AbstractCSQueue implements CSQueue { Map acls = new HashMap(); - boolean reservationsContinueLooking; + volatile boolean reservationsContinueLooking; private boolean preemptionDisabled; // Track resource usage-by-label like used-resource/pending-resource, etc. @@ -333,7 +333,7 @@ public abstract class AbstractCSQueue implements CSQueue { } @Private - public synchronized Resource getMaximumAllocation() { + public Resource getMaximumAllocation() { return maximumAllocation; } @@ -448,13 +448,8 @@ public abstract class AbstractCSQueue implements CSQueue { } synchronized boolean canAssignToThisQueue(Resource clusterResource, - String nodePartition, ResourceLimits currentResourceLimits, - Resource nowRequired, Resource resourceCouldBeUnreserved, + String nodePartition, ResourceLimits currentResourceLimits, Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) { - // New total resource = used + required - Resource newTotalResource = - Resources.add(queueUsage.getUsed(nodePartition), nowRequired); - // Get current limited resource: // - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect // queues' max capacity. @@ -470,8 +465,14 @@ public abstract class AbstractCSQueue implements CSQueue { getCurrentLimitResource(nodePartition, clusterResource, currentResourceLimits, schedulingMode); - if (Resources.greaterThan(resourceCalculator, clusterResource, - newTotalResource, currentLimitResource)) { + Resource nowTotalUsed = queueUsage.getUsed(nodePartition); + + // Set headroom for currentResourceLimits + currentResourceLimits.setHeadroom(Resources.subtract(currentLimitResource, + nowTotalUsed)); + + if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, + nowTotalUsed, currentLimitResource)) { // if reservation continous looking enabled, check to see if could we // potentially use this node instead of a reserved node if the application @@ -483,7 +484,7 @@ public abstract class AbstractCSQueue implements CSQueue { resourceCouldBeUnreserved, Resources.none())) { // resource-without-reserved = used - reserved Resource newTotalWithoutReservedResource = - Resources.subtract(newTotalResource, resourceCouldBeUnreserved); + Resources.subtract(nowTotalUsed, resourceCouldBeUnreserved); // when total-used-without-reserved-resource < currentLimit, we still // have chance to allocate on this node by unreserving some containers @@ -498,8 +499,6 @@ public abstract class AbstractCSQueue implements CSQueue { + newTotalWithoutReservedResource + ", maxLimitCapacity: " + currentLimitResource); } - currentResourceLimits.setAmountNeededUnreserve(Resources.subtract(newTotalResource, - currentLimitResource)); return true; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d725cf9d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java index 2ba2709..ceb6f7e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java @@ -31,8 +31,8 @@ public class CSAssignment { final private Resource resource; private NodeType type; - private final RMContainer excessReservation; - private final FiCaSchedulerApp application; + private RMContainer excessReservation; + private FiCaSchedulerApp application; private final boolean skipped; private boolean fulfilledReservation; private final AssignmentInformation assignmentInformation; @@ -80,10 +80,18 @@ public class CSAssignment { return application; } + public void setApplication(FiCaSchedulerApp application) { + this.application = application; + } + public RMContainer getExcessReservation() { return excessReservation; } + public void setExcessReservation(RMContainer rmContainer) { + excessReservation = rmContainer; + } + public boolean getSkipped() { return skipped; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d725cf9d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java index c6524c6..a3adf9a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java @@ -25,22 +25,16 @@ public class CapacityHeadroomProvider { LeafQueue.User user; LeafQueue queue; FiCaSchedulerApp application; - Resource required; LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo; - public CapacityHeadroomProvider( - LeafQueue.User user, - LeafQueue queue, - FiCaSchedulerApp application, - Resource required, - LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) { - + public CapacityHeadroomProvider(LeafQueue.User user, LeafQueue queue, + FiCaSchedulerApp application, + LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) { + this.user = user; this.queue = queue; this.application = application; - this.required = required; this.queueResourceLimitsInfo = queueResourceLimitsInfo; - } public Resource getHeadroom() { @@ -52,7 +46,7 @@ public class CapacityHeadroomProvider { clusterResource = queueResourceLimitsInfo.getClusterResource(); } Resource headroom = queue.getHeadroom(user, queueCurrentLimit, - clusterResource, application, required); + clusterResource, application); // Corner case to deal with applications being slightly over-limit if (headroom.getMemory() < 0) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d725cf9d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 5a20f8b..68e608a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1178,16 +1178,6 @@ public class CapacityScheduler extends updateSchedulerHealth(lastNodeUpdateTime, node, tmp); schedulerHealth.updateSchedulerFulfilledReservationCounts(1); } - - RMContainer excessReservation = assignment.getExcessReservation(); - if (excessReservation != null) { - Container container = excessReservation.getContainer(); - queue.completedContainer(clusterResource, assignment.getApplication(), - node, excessReservation, SchedulerUtils - .createAbnormalContainerStatus(container.getId(), - SchedulerUtils.UNRESERVED_CONTAINER), - RMContainerEventType.RELEASED, null, true); - } } // Try to schedule more if there are no reservations to fulfill @@ -1241,10 +1231,6 @@ public class CapacityScheduler extends RMNodeLabelsManager.NO_LABEL, clusterResource)), SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); updateSchedulerHealth(lastNodeUpdateTime, node, assignment); - if (Resources.greaterThan(calculator, clusterResource, - assignment.getResource(), Resources.none())) { - return; - } } } else { LOG.info("Skipping scheduling since node " http://git-wip-us.apache.org/repos/asf/hadoop/blob/d725cf9d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 5c283f4..acfbad0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -31,7 +31,6 @@ import java.util.Set; import java.util.TreeSet; import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.mutable.MutableObject; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -42,30 +41,24 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -93,7 +86,7 @@ public class LeafQueue extends AbstractCSQueue { private float maxAMResourcePerQueuePercent; - private int nodeLocalityDelay; + private volatile int nodeLocalityDelay; Map applicationAttemptMap = new HashMap(); @@ -102,7 +95,7 @@ public class LeafQueue extends AbstractCSQueue { Set pendingApplications; - private float minimumAllocationFactor; + private volatile float minimumAllocationFactor; private Map users = new HashMap(); @@ -400,11 +393,6 @@ public class LeafQueue extends AbstractCSQueue { return Collections.singletonList(userAclInfo); } - @Private - public int getNodeLocalityDelay() { - return nodeLocalityDelay; - } - public String toString() { return queueName + ": " + "capacity=" + queueCapacities.getCapacity() + ", " + @@ -745,39 +733,57 @@ public class LeafQueue extends AbstractCSQueue { return applicationAttemptMap.get(applicationAttemptId); } + private void handleExcessReservedContainer(Resource clusterResource, + CSAssignment assignment) { + if (assignment.getExcessReservation() != null) { + RMContainer excessReservedContainer = assignment.getExcessReservation(); + + completedContainer(clusterResource, assignment.getApplication(), + scheduler.getNode(excessReservedContainer.getAllocatedNode()), + excessReservedContainer, + SchedulerUtils.createAbnormalContainerStatus( + excessReservedContainer.getContainerId(), + SchedulerUtils.UNRESERVED_CONTAINER), + RMContainerEventType.RELEASED, null, false); + + assignment.setExcessReservation(null); + } + } + @Override public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { updateCurrentResourceLimits(currentResourceLimits, clusterResource); - - if(LOG.isDebugEnabled()) { + + if (LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() - + " #applications=" + - orderingPolicy.getNumSchedulableEntities()); + + " #applications=" + orderingPolicy.getNumSchedulableEntities()); } - + // Check for reserved resources RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - FiCaSchedulerApp application = + FiCaSchedulerApp application = getApplication(reservedContainer.getApplicationAttemptId()); synchronized (application) { - return assignReservedContainer(application, node, reservedContainer, + CSAssignment assignment = application.assignReservedContainer(node, reservedContainer, clusterResource, schedulingMode); + handleExcessReservedContainer(clusterResource, assignment); + return assignment; } } - + // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY && !accessibleToPartition(node.getPartition())) { return NULL_ASSIGNMENT; } - + // Check if this queue need more resource, simply skip allocation if this // queue doesn't need more resources. - if (!hasPendingResourceRequest(node.getPartition(), - clusterResource, schedulingMode)) { + if (!hasPendingResourceRequest(node.getPartition(), clusterResource, + schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip this queue=" + getQueuePath() + ", because it doesn't need more resource, schedulingMode=" @@ -785,233 +791,74 @@ public class LeafQueue extends AbstractCSQueue { } return NULL_ASSIGNMENT; } - + for (Iterator assignmentIterator = - orderingPolicy.getAssignmentIterator(); - assignmentIterator.hasNext();) { + orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext();) { FiCaSchedulerApp application = assignmentIterator.next(); - if(LOG.isDebugEnabled()) { - LOG.debug("pre-assignContainers for application " - + application.getApplicationId()); - application.showRequests(); + + // Check queue max-capacity limit + if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), + currentResourceLimits, application.getCurrentReservation(), + schedulingMode)) { + return NULL_ASSIGNMENT; } - // Check if application needs more resource, skip if it doesn't need more. - if (!application.hasPendingResourceRequest(resourceCalculator, - node.getPartition(), clusterResource, schedulingMode)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId() - + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-label=" + node.getPartition()); - } + Resource userLimit = + computeUserLimitAndSetHeadroom(application, clusterResource, + node.getPartition(), schedulingMode); + + // Check user limit + if (!canAssignToUser(clusterResource, application.getUser(), userLimit, + application, node.getPartition(), currentResourceLimits)) { continue; } - synchronized (application) { - // Check if this resource is on the blacklist - if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) { - continue; - } - - // Schedule in priority order - for (Priority priority : application.getPriorities()) { - ResourceRequest anyRequest = - application.getResourceRequest(priority, ResourceRequest.ANY); - if (null == anyRequest) { - continue; - } - - // Required resource - Resource required = anyRequest.getCapability(); + // Try to schedule + CSAssignment assignment = + application.assignContainers(clusterResource, node, + currentResourceLimits, schedulingMode); - // Do we need containers at this 'priority'? - if (application.getTotalRequiredResources(priority) <= 0) { - continue; - } - - // AM container allocation doesn't support non-exclusive allocation to - // avoid painful of preempt an AM container - if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { - RMAppAttempt rmAppAttempt = - csContext.getRMContext().getRMApps() - .get(application.getApplicationId()).getCurrentAppAttempt(); - if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false - && null == rmAppAttempt.getMasterContainer()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip allocating AM container to app_attempt=" - + application.getApplicationAttemptId() - + ", don't allow to allocate AM container in non-exclusive mode"); - } - break; - } - } - - // Is the node-label-expression of this offswitch resource request - // matches the node's label? - // If not match, jump to next priority. - if (!SchedulerUtils.checkResourceRequestMatchingNodePartition( - anyRequest, node.getPartition(), schedulingMode)) { - continue; - } - - if (!this.reservationsContinueLooking) { - if (!shouldAllocOrReserveNewContainer(application, priority, required)) { - if (LOG.isDebugEnabled()) { - LOG.debug("doesn't need containers based on reservation algo!"); - } - continue; - } - } - - // Compute user-limit & set headroom - // Note: We compute both user-limit & headroom with the highest - // priority request as the target. - // This works since we never assign lower priority requests - // before all higher priority ones are serviced. - Resource userLimit = - computeUserLimitAndSetHeadroom(application, clusterResource, - required, node.getPartition(), schedulingMode); - - // Check queue max-capacity limit - if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), - currentResourceLimits, required, - application.getCurrentReservation(), schedulingMode)) { - return NULL_ASSIGNMENT; - } + if (LOG.isDebugEnabled()) { + LOG.debug("post-assignContainers for application " + + application.getApplicationId()); + application.showRequests(); + } - // Check user limit - if (!canAssignToUser(clusterResource, application.getUser(), userLimit, - application, node.getPartition(), currentResourceLimits)) { - break; - } + // Did we schedule or reserve a container? + Resource assigned = assignment.getResource(); + + handleExcessReservedContainer(clusterResource, assignment); - // Inform the application it is about to get a scheduling opportunity - application.addSchedulingOpportunity(priority); - - // Increase missed-non-partitioned-resource-request-opportunity. - // This is to make sure non-partitioned-resource-request will prefer - // to be allocated to non-partitioned nodes - int missedNonPartitionedRequestSchedulingOpportunity = 0; - if (anyRequest.getNodeLabelExpression().equals( - RMNodeLabelsManager.NO_LABEL)) { - missedNonPartitionedRequestSchedulingOpportunity = - application - .addMissedNonPartitionedRequestSchedulingOpportunity(priority); - } - - if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { - // Before doing allocation, we need to check scheduling opportunity to - // make sure : non-partitioned resource request should be scheduled to - // non-partitioned partition first. - if (missedNonPartitionedRequestSchedulingOpportunity < scheduler - .getNumClusterNodes()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip app_attempt=" - + application.getApplicationAttemptId() - + " priority=" - + priority - + " because missed-non-partitioned-resource-request" - + " opportunity under requred:" - + " Now=" + missedNonPartitionedRequestSchedulingOpportunity - + " required=" - + scheduler.getNumClusterNodes()); - } - - break; - } - } - - // Try to schedule - CSAssignment assignment = - assignContainersOnNode(clusterResource, node, application, priority, - null, schedulingMode, currentResourceLimits); - - // Did the application skip this node? - if (assignment.getSkipped()) { - // Don't count 'skipped nodes' as a scheduling opportunity! - application.subtractSchedulingOpportunity(priority); - continue; - } - - // Did we schedule or reserve a container? - Resource assigned = assignment.getResource(); - if (Resources.greaterThan( - resourceCalculator, clusterResource, assigned, Resources.none())) { - // Get reserved or allocated container from application - RMContainer reservedOrAllocatedRMContainer = - application.getRMContainer(assignment - .getAssignmentInformation() - .getFirstAllocatedOrReservedContainerId()); - - // Book-keeping - // Note: Update headroom to account for current allocation too... - allocateResource(clusterResource, application, assigned, - node.getPartition(), reservedOrAllocatedRMContainer); - - // Don't reset scheduling opportunities for offswitch assignments - // otherwise the app will be delayed for each non-local assignment. - // This helps apps with many off-cluster requests schedule faster. - if (assignment.getType() != NodeType.OFF_SWITCH) { - if (LOG.isDebugEnabled()) { - LOG.debug("Resetting scheduling opportunities"); - } - application.resetSchedulingOpportunities(priority); - } - // Non-exclusive scheduling opportunity is different: we need reset - // it every time to make sure non-labeled resource request will be - // most likely allocated on non-labeled nodes first. - application.resetMissedNonPartitionedRequestSchedulingOpportunity(priority); - - // Done - return assignment; - } else { - // Do not assign out of order w.r.t priorities - break; - } - } - } + if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, + Resources.none())) { + // Get reserved or allocated container from application + RMContainer reservedOrAllocatedRMContainer = + application.getRMContainer(assignment.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId()); - if(LOG.isDebugEnabled()) { - LOG.debug("post-assignContainers for application " - + application.getApplicationId()); + // Book-keeping + // Note: Update headroom to account for current allocation too... + allocateResource(clusterResource, application, assigned, + node.getPartition(), reservedOrAllocatedRMContainer); + + // Done + return assignment; + } else if (!assignment.getSkipped()) { + // If we don't allocate anything, and it is not skipped by application, + // we will return to respect FIFO of applications + return NULL_ASSIGNMENT; } - application.showRequests(); } - - return NULL_ASSIGNMENT; + return NULL_ASSIGNMENT; } - private synchronized CSAssignment assignReservedContainer( - FiCaSchedulerApp application, FiCaSchedulerNode node, - RMContainer rmContainer, Resource clusterResource, - SchedulingMode schedulingMode) { - // Do we still need this reservation? - Priority priority = rmContainer.getReservedPriority(); - if (application.getTotalRequiredResources(priority) == 0) { - // Release - return new CSAssignment(application, rmContainer); - } - - // Try to assign if we have sufficient resources - CSAssignment tmp = - assignContainersOnNode(clusterResource, node, application, priority, - rmContainer, schedulingMode, new ResourceLimits(Resources.none())); - - // Doesn't matter... since it's already charged for at time of reservation - // "re-reservation" is *free* - CSAssignment ret = new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); - if (tmp.getAssignmentInformation().getNumAllocations() > 0) { - ret.setFulfilledReservation(true); - } - return ret; - } - protected Resource getHeadroom(User user, Resource queueCurrentLimit, - Resource clusterResource, FiCaSchedulerApp application, Resource required) { + Resource clusterResource, FiCaSchedulerApp application) { return getHeadroom(user, queueCurrentLimit, clusterResource, - computeUserLimit(application, clusterResource, required, user, - RMNodeLabelsManager.NO_LABEL, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); + computeUserLimit(application, clusterResource, user, + RMNodeLabelsManager.NO_LABEL, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); } private Resource getHeadroom(User user, Resource currentResourceLimit, @@ -1055,7 +902,7 @@ public class LeafQueue extends AbstractCSQueue { @Lock({LeafQueue.class, FiCaSchedulerApp.class}) Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, - Resource clusterResource, Resource required, String nodePartition, + Resource clusterResource, String nodePartition, SchedulingMode schedulingMode) { String user = application.getUser(); User queueUser = getUser(user); @@ -1063,8 +910,8 @@ public class LeafQueue extends AbstractCSQueue { // Compute user limit respect requested labels, // TODO, need consider headroom respect labels also Resource userLimit = - computeUserLimit(application, clusterResource, required, - queueUser, nodePartition, schedulingMode); + computeUserLimit(application, clusterResource, queueUser, + nodePartition, schedulingMode); setQueueResourceLimitsInfo(clusterResource); @@ -1081,7 +928,7 @@ public class LeafQueue extends AbstractCSQueue { } CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider( - queueUser, this, application, required, queueResourceLimitsInfo); + queueUser, this, application, queueResourceLimitsInfo); application.setHeadroomProvider(headroomProvider); @@ -1091,8 +938,13 @@ public class LeafQueue extends AbstractCSQueue { } @Lock(NoLock.class) + public int getNodeLocalityDelay() { + return nodeLocalityDelay; + } + + @Lock(NoLock.class) private Resource computeUserLimit(FiCaSchedulerApp application, - Resource clusterResource, Resource required, User user, + Resource clusterResource, User user, String nodePartition, SchedulingMode schedulingMode) { // What is our current capacity? // * It is equal to the max(required, queue-capacity) if @@ -1106,6 +958,11 @@ public class LeafQueue extends AbstractCSQueue { queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation); + // Assume we have required resource equals to minimumAllocation, this can + // make sure user limit can continuously increase till queueMaxResource + // reached. + Resource required = minimumAllocation; + // Allow progress for queues with miniscule capacity queueCapacity = Resources.max( @@ -1206,8 +1063,8 @@ public class LeafQueue extends AbstractCSQueue { if (Resources.lessThanOrEqual( resourceCalculator, clusterResource, - Resources.subtract(user.getUsed(),application.getCurrentReservation()), - limit)) { + Resources.subtract(user.getUsed(), + application.getCurrentReservation()), limit)) { if (LOG.isDebugEnabled()) { LOG.debug("User " + userName + " in queue " + getQueueName() @@ -1215,13 +1072,11 @@ public class LeafQueue extends AbstractCSQueue { + user.getUsed() + " reserved: " + application.getCurrentReservation() + " limit: " + limit); } - Resource amountNeededToUnreserve = Resources.subtract(user.getUsed(nodePartition), limit); - // we can only acquire a new container if we unreserve first since we ignored the - // user limit. Choose the max of user limit or what was previously set by max - // capacity. - currentResoureLimits.setAmountNeededUnreserve( - Resources.max(resourceCalculator, clusterResource, - currentResoureLimits.getAmountNeededUnreserve(), amountNeededToUnreserve)); + Resource amountNeededToUnreserve = + Resources.subtract(user.getUsed(nodePartition), limit); + // we can only acquire a new container if we unreserve first to + // respect user-limit + currentResoureLimits.setAmountNeededUnreserve(amountNeededToUnreserve); return true; } } @@ -1235,476 +1090,6 @@ public class LeafQueue extends AbstractCSQueue { return true; } - boolean shouldAllocOrReserveNewContainer(FiCaSchedulerApp application, - Priority priority, Resource required) { - int requiredContainers = application.getTotalRequiredResources(priority); - int reservedContainers = application.getNumReservedContainers(priority); - int starvation = 0; - if (reservedContainers > 0) { - float nodeFactor = - Resources.ratio( - resourceCalculator, required, getMaximumAllocation() - ); - - // Use percentage of node required to bias against large containers... - // Protect against corner case where you need the whole node with - // Math.min(nodeFactor, minimumAllocationFactor) - starvation = - (int)((application.getReReservations(priority) / (float)reservedContainers) * - (1.0f - (Math.min(nodeFactor, getMinimumAllocationFactor()))) - ); - - if (LOG.isDebugEnabled()) { - LOG.debug("needsContainers:" + - " app.#re-reserve=" + application.getReReservations(priority) + - " reserved=" + reservedContainers + - " nodeFactor=" + nodeFactor + - " minAllocFactor=" + getMinimumAllocationFactor() + - " starvation=" + starvation); - } - } - return (((starvation + requiredContainers) - reservedContainers) > 0); - } - - private CSAssignment assignContainersOnNode(Resource clusterResource, - FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, SchedulingMode schedulingMode, - ResourceLimits currentResoureLimits) { - - CSAssignment assigned; - - NodeType requestType = null; - MutableObject allocatedContainer = new MutableObject(); - // Data-local - ResourceRequest nodeLocalResourceRequest = - application.getResourceRequest(priority, node.getNodeName()); - if (nodeLocalResourceRequest != null) { - requestType = NodeType.NODE_LOCAL; - assigned = - assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, - node, application, priority, reservedContainer, - allocatedContainer, schedulingMode, currentResoureLimits); - if (Resources.greaterThan(resourceCalculator, clusterResource, - assigned.getResource(), Resources.none())) { - - //update locality statistics - if (allocatedContainer.getValue() != null) { - application.incNumAllocatedContainers(NodeType.NODE_LOCAL, - requestType); - } - assigned.setType(NodeType.NODE_LOCAL); - return assigned; - } - } - - // Rack-local - ResourceRequest rackLocalResourceRequest = - application.getResourceRequest(priority, node.getRackName()); - if (rackLocalResourceRequest != null) { - if (!rackLocalResourceRequest.getRelaxLocality()) { - return SKIP_ASSIGNMENT; - } - - if (requestType != NodeType.NODE_LOCAL) { - requestType = NodeType.RACK_LOCAL; - } - - assigned = - assignRackLocalContainers(clusterResource, rackLocalResourceRequest, - node, application, priority, reservedContainer, - allocatedContainer, schedulingMode, currentResoureLimits); - if (Resources.greaterThan(resourceCalculator, clusterResource, - assigned.getResource(), Resources.none())) { - - //update locality statistics - if (allocatedContainer.getValue() != null) { - application.incNumAllocatedContainers(NodeType.RACK_LOCAL, - requestType); - } - assigned.setType(NodeType.RACK_LOCAL); - return assigned; - } - } - - // Off-switch - ResourceRequest offSwitchResourceRequest = - application.getResourceRequest(priority, ResourceRequest.ANY); - if (offSwitchResourceRequest != null) { - if (!offSwitchResourceRequest.getRelaxLocality()) { - return SKIP_ASSIGNMENT; - } - if (requestType != NodeType.NODE_LOCAL - && requestType != NodeType.RACK_LOCAL) { - requestType = NodeType.OFF_SWITCH; - } - - assigned = - assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, - node, application, priority, reservedContainer, - allocatedContainer, schedulingMode, currentResoureLimits); - - // update locality statistics - if (allocatedContainer.getValue() != null) { - application.incNumAllocatedContainers(NodeType.OFF_SWITCH, requestType); - } - assigned.setType(NodeType.OFF_SWITCH); - return assigned; - } - - return SKIP_ASSIGNMENT; - } - - @Private - protected boolean findNodeToUnreserve(Resource clusterResource, - FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - Resource minimumUnreservedResource) { - // need to unreserve some other container first - NodeId idToUnreserve = - application.getNodeIdToUnreserve(priority, minimumUnreservedResource, - resourceCalculator, clusterResource); - if (idToUnreserve == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("checked to see if could unreserve for app but nothing " - + "reserved that matches for this app"); - } - return false; - } - FiCaSchedulerNode nodeToUnreserve = scheduler.getNode(idToUnreserve); - if (nodeToUnreserve == null) { - LOG.error("node to unreserve doesn't exist, nodeid: " + idToUnreserve); - return false; - } - if (LOG.isDebugEnabled()) { - LOG.debug("unreserving for app: " + application.getApplicationId() - + " on nodeId: " + idToUnreserve - + " in order to replace reserved application and place it on node: " - + node.getNodeID() + " needing: " + minimumUnreservedResource); - } - - // headroom - Resources.addTo(application.getHeadroom(), nodeToUnreserve - .getReservedContainer().getReservedResource()); - - // Make sure to not have completedContainers sort the queues here since - // we are already inside an iterator loop for the queues and this would - // cause an concurrent modification exception. - completedContainer(clusterResource, application, nodeToUnreserve, - nodeToUnreserve.getReservedContainer(), - SchedulerUtils.createAbnormalContainerStatus(nodeToUnreserve - .getReservedContainer().getContainerId(), - SchedulerUtils.UNRESERVED_CONTAINER), - RMContainerEventType.RELEASED, null, false); - return true; - } - - private CSAssignment assignNodeLocalContainers(Resource clusterResource, - ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, - FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer, - SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { - if (canAssign(application, priority, node, NodeType.NODE_LOCAL, - reservedContainer)) { - return assignContainer(clusterResource, node, application, priority, - nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, - allocatedContainer, schedulingMode, currentResoureLimits); - } - - return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); - } - - private CSAssignment assignRackLocalContainers(Resource clusterResource, - ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node, - FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer, - SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { - if (canAssign(application, priority, node, NodeType.RACK_LOCAL, - reservedContainer)) { - return assignContainer(clusterResource, node, application, priority, - rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, - allocatedContainer, schedulingMode, currentResoureLimits); - } - - return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL); - } - - private CSAssignment assignOffSwitchContainers(Resource clusterResource, - ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node, - FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer, - SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { - if (canAssign(application, priority, node, NodeType.OFF_SWITCH, - reservedContainer)) { - return assignContainer(clusterResource, node, application, priority, - offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, - allocatedContainer, schedulingMode, currentResoureLimits); - } - - return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH); - } - - private int getActualNodeLocalityDelay() { - return Math.min(scheduler.getNumClusterNodes(), getNodeLocalityDelay()); - } - - boolean canAssign(FiCaSchedulerApp application, Priority priority, - FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) { - - // Clearly we need containers for this application... - if (type == NodeType.OFF_SWITCH) { - if (reservedContainer != null) { - return true; - } - - // 'Delay' off-switch - ResourceRequest offSwitchRequest = - application.getResourceRequest(priority, ResourceRequest.ANY); - long missedOpportunities = application.getSchedulingOpportunities(priority); - long requiredContainers = offSwitchRequest.getNumContainers(); - - float localityWaitFactor = - application.getLocalityWaitFactor(priority, - scheduler.getNumClusterNodes()); - - return ((requiredContainers * localityWaitFactor) < missedOpportunities); - } - - // Check if we need containers on this rack - ResourceRequest rackLocalRequest = - application.getResourceRequest(priority, node.getRackName()); - if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) { - return false; - } - - // If we are here, we do need containers on this rack for RACK_LOCAL req - if (type == NodeType.RACK_LOCAL) { - // 'Delay' rack-local just a little bit... - long missedOpportunities = application.getSchedulingOpportunities(priority); - return getActualNodeLocalityDelay() < missedOpportunities; - } - - // Check if we need containers on this host - if (type == NodeType.NODE_LOCAL) { - // Now check if we need containers on this host... - ResourceRequest nodeLocalRequest = - application.getResourceRequest(priority, node.getNodeName()); - if (nodeLocalRequest != null) { - return nodeLocalRequest.getNumContainers() > 0; - } - } - - return false; - } - - private Container getContainer(RMContainer rmContainer, - FiCaSchedulerApp application, FiCaSchedulerNode node, - Resource capability, Priority priority) { - return (rmContainer != null) ? rmContainer.getContainer() : - createContainer(application, node, capability, priority); - } - - Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node, - Resource capability, Priority priority) { - - NodeId nodeId = node.getRMNode().getNodeID(); - ContainerId containerId = BuilderUtils.newContainerId(application - .getApplicationAttemptId(), application.getNewContainerId()); - - // Create the container - return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() - .getHttpAddress(), capability, priority, null); - - } - - - private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node, - FiCaSchedulerApp application, Priority priority, - ResourceRequest request, NodeType type, RMContainer rmContainer, - MutableObject createdContainer, SchedulingMode schedulingMode, - ResourceLimits currentResoureLimits) { - if (LOG.isDebugEnabled()) { - LOG.debug("assignContainers: node=" + node.getNodeName() - + " application=" + application.getApplicationId() - + " priority=" + priority.getPriority() - + " request=" + request + " type=" + type); - } - - // check if the resource request can access the label - if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request, - node.getPartition(), schedulingMode)) { - // this is a reserved container, but we cannot allocate it now according - // to label not match. This can be caused by node label changed - // We should un-reserve this container. - if (rmContainer != null) { - unreserve(application, priority, node, rmContainer); - } - return new CSAssignment(Resources.none(), type); - } - - Resource capability = request.getCapability(); - Resource available = node.getAvailableResource(); - Resource totalResource = node.getTotalResource(); - - if (!Resources.lessThanOrEqual(resourceCalculator, clusterResource, - capability, totalResource)) { - LOG.warn("Node : " + node.getNodeID() - + " does not have sufficient resource for request : " + request - + " node total capability : " + node.getTotalResource()); - return new CSAssignment(Resources.none(), type); - } - - assert Resources.greaterThan( - resourceCalculator, clusterResource, available, Resources.none()); - - // Create the container if necessary - Container container = - getContainer(rmContainer, application, node, capability, priority); - - // something went wrong getting/creating the container - if (container == null) { - LOG.warn("Couldn't get container for allocation!"); - return new CSAssignment(Resources.none(), type); - } - - boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer( - application, priority, capability); - - // Can we allocate a container on this node? - int availableContainers = - resourceCalculator.computeAvailableContainers(available, capability); - - boolean needToUnreserve = Resources.greaterThan(resourceCalculator,clusterResource, - currentResoureLimits.getAmountNeededUnreserve(), Resources.none()); - - if (availableContainers > 0) { - // Allocate... - - // Did we previously reserve containers at this 'priority'? - if (rmContainer != null) { - unreserve(application, priority, node, rmContainer); - } else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) { - // when reservationsContinueLooking is set, we may need to unreserve - // some containers to meet this queue, its parents', or the users' resource limits. - // TODO, need change here when we want to support continuous reservation - // looking for labeled partitions. - if (!shouldAllocOrReserveNewContainer || needToUnreserve) { - // If we shouldn't allocate/reserve new container then we should unreserve one the same - // size we are asking for since the currentResoureLimits.getAmountNeededUnreserve - // could be zero. If the limit was hit then use the amount we need to unreserve to be - // under the limit. - Resource amountToUnreserve = capability; - if (needToUnreserve) { - amountToUnreserve = currentResoureLimits.getAmountNeededUnreserve(); - } - boolean containerUnreserved = - findNodeToUnreserve(clusterResource, node, application, priority, - amountToUnreserve); - // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved - // container (That means we *have to* unreserve some resource to - // continue)). If we failed to unreserve some resource, we can't continue. - if (!containerUnreserved) { - return new CSAssignment(Resources.none(), type); - } - } - } - - // Inform the application - RMContainer allocatedContainer = - application.allocate(type, node, priority, request, container); - - // Does the application need this resource? - if (allocatedContainer == null) { - return new CSAssignment(Resources.none(), type); - } - - // Inform the node - node.allocateContainer(allocatedContainer); - - // Inform the ordering policy - orderingPolicy.containerAllocated(application, allocatedContainer); - - LOG.info("assignedContainer" + - " application attempt=" + application.getApplicationAttemptId() + - " container=" + container + - " queue=" + this + - " clusterResource=" + clusterResource); - createdContainer.setValue(allocatedContainer); - CSAssignment assignment = new CSAssignment(container.getResource(), type); - assignment.getAssignmentInformation().addAllocationDetails( - container.getId(), getQueuePath()); - assignment.getAssignmentInformation().incrAllocations(); - Resources.addTo(assignment.getAssignmentInformation().getAllocated(), - container.getResource()); - return assignment; - } else { - // if we are allowed to allocate but this node doesn't have space, reserve it or - // if this was an already a reserved container, reserve it again - if (shouldAllocOrReserveNewContainer || rmContainer != null) { - - if (reservationsContinueLooking && rmContainer == null) { - // we could possibly ignoring queue capacity or user limits when - // reservationsContinueLooking is set. Make sure we didn't need to unreserve - // one. - if (needToUnreserve) { - if (LOG.isDebugEnabled()) { - LOG.debug("we needed to unreserve to be able to allocate"); - } - return new CSAssignment(Resources.none(), type); - } - } - - // Reserve by 'charging' in advance... - reserve(application, priority, node, rmContainer, container); - - LOG.info("Reserved container " + - " application=" + application.getApplicationId() + - " resource=" + request.getCapability() + - " queue=" + this.toString() + - " usedCapacity=" + getUsedCapacity() + - " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + queueUsage.getUsed() + - " cluster=" + clusterResource); - CSAssignment assignment = - new CSAssignment(request.getCapability(), type); - assignment.getAssignmentInformation().addReservationDetails( - container.getId(), getQueuePath()); - assignment.getAssignmentInformation().incrReservations(); - Resources.addTo(assignment.getAssignmentInformation().getReserved(), - request.getCapability()); - return assignment; - } - return new CSAssignment(Resources.none(), type); - } - } - - private void reserve(FiCaSchedulerApp application, Priority priority, - FiCaSchedulerNode node, RMContainer rmContainer, Container container) { - // Update reserved metrics if this is the first reservation - if (rmContainer == null) { - getMetrics().reserveResource( - application.getUser(), container.getResource()); - } - - // Inform the application - rmContainer = application.reserve(node, priority, rmContainer, container); - - // Update the node - node.reserveResource(application, priority, rmContainer); - } - - private boolean unreserve(FiCaSchedulerApp application, Priority priority, - FiCaSchedulerNode node, RMContainer rmContainer) { - // Done with the reservation? - if (application.unreserve(node, priority)) { - node.unreserveResource(application); - - // Update reserved metrics - getMetrics().unreserveResource(application.getUser(), - rmContainer.getContainer().getResource()); - return true; - } - return false; - } - @Override public void completedContainer(Resource clusterResource, FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, @@ -1724,7 +1109,7 @@ public class LeafQueue extends AbstractCSQueue { // happen under scheduler's lock... // So, this is, in effect, a transaction across application & node if (rmContainer.getState() == RMContainerState.RESERVED) { - removed = unreserve(application, rmContainer.getReservedPriority(), + removed = application.unreserve(rmContainer.getReservedPriority(), node, rmContainer); } else { removed = @@ -1838,15 +1223,17 @@ public class LeafQueue extends AbstractCSQueue { // Even if ParentQueue will set limits respect child's max queue capacity, // but when allocating reserved container, CapacityScheduler doesn't do // this. So need cap limits by queue's max capacity here. - this.cachedResourceLimitsForHeadroom = new ResourceLimits(currentResourceLimits.getLimit()); + this.cachedResourceLimitsForHeadroom = + new ResourceLimits(currentResourceLimits.getLimit()); Resource queueMaxResource = Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource), queueCapacities .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL), minimumAllocation); - this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(resourceCalculator, - clusterResource, queueMaxResource, currentResourceLimits.getLimit())); + this.cachedResourceLimitsForHeadroom.setLimit(Resources.min( + resourceCalculator, clusterResource, queueMaxResource, + currentResourceLimits.getLimit())); } @Override @@ -1874,7 +1261,7 @@ public class LeafQueue extends AbstractCSQueue { orderingPolicy.getSchedulableEntities()) { synchronized (application) { computeUserLimitAndSetHeadroom(application, clusterResource, - Resources.none(), RMNodeLabelsManager.NO_LABEL, + RMNodeLabelsManager.NO_LABEL, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d725cf9d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 5807dd1..e54b9e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -73,6 +73,7 @@ public class ParentQueue extends AbstractCSQueue { final PartitionedQueueComparator partitionQueueComparator; volatile int numApplications; private final CapacitySchedulerContext scheduler; + private boolean needToResortQueuesAtNextAllocation = false; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -411,7 +412,7 @@ public class ParentQueue extends AbstractCSQueue { // This will also consider parent's limits and also continuous reservation // looking if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), - resourceLimits, minimumAllocation, Resources.createResource( + resourceLimits, Resources.createResource( getMetrics().getReservedMB(), getMetrics() .getReservedVirtualCores()), schedulingMode)) { break; @@ -527,6 +528,14 @@ public class ParentQueue extends AbstractCSQueue { private Iterator sortAndGetChildrenAllocationIterator(FiCaSchedulerNode node) { if (node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) { + if (needToResortQueuesAtNextAllocation) { + // If we skipped resort queues last time, we need to re-sort queue + // before allocation + List childrenList = new ArrayList<>(childQueues); + childQueues.clear(); + childQueues.addAll(childrenList); + needToResortQueuesAtNextAllocation = false; + } return childQueues.iterator(); } @@ -644,6 +653,11 @@ public class ParentQueue extends AbstractCSQueue { } } } + + // If we skipped sort queue this time, we need to resort queues to make + // sure we allocate from least usage (or order defined by queue policy) + // queues. + needToResortQueuesAtNextAllocation = !sortQueues; } // Inform the parent