Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3353717E56 for ; Tue, 14 Apr 2015 18:46:50 +0000 (UTC) Received: (qmail 69510 invoked by uid 500); 14 Apr 2015 18:46:43 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 69148 invoked by uid 500); 14 Apr 2015 18:46:43 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 68666 invoked by uid 99); 14 Apr 2015 18:46:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Apr 2015 18:46:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D238DE0419; Tue, 14 Apr 2015 18:46:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianhe@apache.org To: common-commits@hadoop.apache.org Date: Tue, 14 Apr 2015 18:46:44 -0000 Message-Id: <21192fc7c4f24e5596d09de94f160c20@git.apache.org> In-Reply-To: <33a96736688348cca0133f07b8ff3962@git.apache.org> References: <33a96736688348cca0133f07b8ff3962@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] hadoop git commit: YARN-3361. CapacityScheduler side changes to support non-exclusive node labels. Contributed by Wangda Tan (cherry picked from commit 0fefda645bca935b87b6bb8ca63e6f18340d59f5) YARN-3361. CapacityScheduler side changes to support non-exclusive node labels. Contributed by Wangda Tan (cherry picked from commit 0fefda645bca935b87b6bb8ca63e6f18340d59f5) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9ebbf1bf Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9ebbf1bf Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9ebbf1bf Branch: refs/heads/branch-2 Commit: 9ebbf1bfcea9942117727c08c6905dd444c230ae Parents: 81bbee6 Author: Jian He Authored: Tue Apr 14 11:36:37 2015 -0700 Committer: Jian He Committed: Tue Apr 14 11:46:35 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/server/utils/BuilderUtils.java | 1 + .../rmapp/attempt/RMAppAttemptImpl.java | 10 +- .../scheduler/AppSchedulingInfo.java | 25 +- .../scheduler/ResourceUsage.java | 8 + .../scheduler/SchedulerApplicationAttempt.java | 44 +- .../scheduler/SchedulerUtils.java | 87 +- .../scheduler/capacity/AbstractCSQueue.java | 243 +++-- .../scheduler/capacity/CSQueue.java | 5 +- .../scheduler/capacity/CapacityScheduler.java | 91 +- .../CapacitySchedulerConfiguration.java | 5 + .../scheduler/capacity/LeafQueue.java | 368 ++++--- .../scheduler/capacity/ParentQueue.java | 59 +- .../scheduler/capacity/SchedulingMode.java | 44 + .../server/resourcemanager/Application.java | 4 + .../yarn/server/resourcemanager/MockAM.java | 8 +- .../yarn/server/resourcemanager/MockRM.java | 35 +- .../capacity/TestApplicationLimits.java | 8 +- .../scheduler/capacity/TestChildQueueOrder.java | 41 +- .../capacity/TestContainerAllocation.java | 390 +------ .../scheduler/capacity/TestLeafQueue.java | 148 +-- .../TestNodeLabelContainerAllocation.java | 1027 ++++++++++++++++++ .../scheduler/capacity/TestParentQueue.java | 111 +- .../scheduler/capacity/TestReservations.java | 101 +- .../scheduler/capacity/TestUtils.java | 2 + 25 files changed, 1914 insertions(+), 954 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 478d0ae..059c5a3 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -24,6 +24,9 @@ Release 2.8.0 - UNRELEASED YARN-3443. Create a 'ResourceHandler' subsystem to ease addition of support for new resource types on the NM. (Sidharta Seethana via junping_du) + YARN-3361. CapacityScheduler side changes to support non-exclusive node + labels. (Wangda Tan via jianhe) + IMPROVEMENTS YARN-1880. Cleanup TestApplicationClientProtocolOnHA http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index 68d4ef9..f2146c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -313,6 +313,7 @@ public class BuilderUtils { request.setResourceName(r.getResourceName()); request.setCapability(r.getCapability()); request.setNumContainers(r.getNumContainers()); + request.setNodeLabelExpression(r.getNodeLabelExpression()); return request; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 1be1727..1071831 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -146,7 +146,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { private ConcurrentMap> finishedContainersSentToAM = new ConcurrentHashMap>(); - private Container masterContainer; + private volatile Container masterContainer; private float progress = 0; private String host = "N/A"; @@ -762,13 +762,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { @Override public Container getMasterContainer() { - this.readLock.lock(); - - try { - return this.masterContainer; - } finally { - this.readLock.unlock(); - } + return this.masterContainer; } @InterfaceAudience.Private http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 5521d47..5604f0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -73,10 +73,11 @@ public class AppSchedulingInfo { /* Allocated by scheduler */ boolean pending = true; // for app metrics + private ResourceUsage appResourceUsage; public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, - long epoch) { + long epoch, ResourceUsage appResourceUsage) { this.applicationAttemptId = appAttemptId; this.applicationId = appAttemptId.getApplicationId(); this.queue = queue; @@ -84,6 +85,7 @@ public class AppSchedulingInfo { this.user = user; this.activeUsersManager = activeUsersManager; this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT); + this.appResourceUsage = appResourceUsage; } public ApplicationId getApplicationId() { @@ -191,13 +193,19 @@ public class AppSchedulingInfo { lastRequestCapability); // update queue: + Resource increasedResource = Resources.multiply(request.getCapability(), + request.getNumContainers()); queue.incPendingResource( request.getNodeLabelExpression(), - Resources.multiply(request.getCapability(), - request.getNumContainers())); + increasedResource); + appResourceUsage.incPending(request.getNodeLabelExpression(), increasedResource); if (lastRequest != null) { + Resource decreasedResource = + Resources.multiply(lastRequestCapability, lastRequestContainers); queue.decPendingResource(lastRequest.getNodeLabelExpression(), - Resources.multiply(lastRequestCapability, lastRequestContainers)); + decreasedResource); + appResourceUsage.decPending(lastRequest.getNodeLabelExpression(), + decreasedResource); } } } @@ -385,6 +393,8 @@ public class AppSchedulingInfo { checkForDeactivation(); } + appResourceUsage.decPending(offSwitchRequest.getNodeLabelExpression(), + offSwitchRequest.getCapability()); queue.decPendingResource(offSwitchRequest.getNodeLabelExpression(), offSwitchRequest.getCapability()); } @@ -492,9 +502,10 @@ public class AppSchedulingInfo { } public ResourceRequest cloneResourceRequest(ResourceRequest request) { - ResourceRequest newRequest = ResourceRequest.newInstance( - request.getPriority(), request.getResourceName(), - request.getCapability(), 1, request.getRelaxLocality()); + ResourceRequest newRequest = + ResourceRequest.newInstance(request.getPriority(), + request.getResourceName(), request.getCapability(), 1, + request.getRelaxLocality(), request.getNodeLabelExpression()); return newRequest; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java index 36ee4da..5169b78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java @@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -250,6 +251,10 @@ public class ResourceUsage { } private Resource _get(String label, ResourceType type) { + if (label == null) { + label = RMNodeLabelsManager.NO_LABEL; + } + try { readLock.lock(); UsageByLabel usage = usages.get(label); @@ -263,6 +268,9 @@ public class ResourceUsage { } private UsageByLabel getAndAddIfMissing(String label) { + if (label == null) { + label = RMNodeLabelsManager.NO_LABEL; + } if (!usages.containsKey(label)) { UsageByLabel u = new UsageByLabel(label); usages.put(label, u); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 5e0bbc7..fccf766 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.base.Preconditions; @@ -108,14 +110,24 @@ public class SchedulerApplicationAttempt { private Set pendingRelease = null; /** - * Count how many times the application has been given an opportunity - * to schedule a task at each priority. Each time the scheduler - * asks the application for a task at this priority, it is incremented, - * and each time the application successfully schedules a task, it + * Count how many times the application has been given an opportunity to + * schedule a task at each priority. Each time the scheduler asks the + * application for a task at this priority, it is incremented, and each time + * the application successfully schedules a task (at rack or node local), it * is reset to 0. */ Multiset schedulingOpportunities = HashMultiset.create(); + /** + * Count how many times the application has been given an opportunity to + * schedule a non-partitioned resource request at each priority. Each time the + * scheduler asks the application for a task at this priority, it is + * incremented, and each time the application successfully schedules a task, + * it is reset to 0 when schedule any task at corresponding priority. + */ + Multiset missedNonPartitionedRequestSchedulingOpportunity = + HashMultiset.create(); + // Time of the last container scheduled at the current allowed level protected Map lastScheduledContainer = new HashMap(); @@ -132,7 +144,7 @@ public class SchedulerApplicationAttempt { this.rmContext = rmContext; this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user, queue, - activeUsersManager, rmContext.getEpoch()); + activeUsersManager, rmContext.getEpoch(), attemptResourceUsage); this.queue = queue; this.pendingRelease = new HashSet(); this.attemptId = applicationAttemptId; @@ -489,6 +501,18 @@ public class SchedulerApplicationAttempt { return this.appSchedulingInfo.isBlacklisted(resourceName); } + public synchronized int addMissedNonPartitionedRequestSchedulingOpportunity( + Priority priority) { + missedNonPartitionedRequestSchedulingOpportunity.add(priority); + return missedNonPartitionedRequestSchedulingOpportunity.count(priority); + } + + public synchronized void + resetMissedNonPartitionedRequestSchedulingOpportunity(Priority priority) { + missedNonPartitionedRequestSchedulingOpportunity.setCount(priority, 0); + } + + public synchronized void addSchedulingOpportunity(Priority priority) { schedulingOpportunities.setCount(priority, schedulingOpportunities.count(priority) + 1); @@ -518,6 +542,7 @@ public class SchedulerApplicationAttempt { public synchronized void resetSchedulingOpportunities(Priority priority) { resetSchedulingOpportunities(priority, System.currentTimeMillis()); } + // used for continuous scheduling public synchronized void resetSchedulingOpportunities(Priority priority, long currentTimeMs) { @@ -669,4 +694,13 @@ public class SchedulerApplicationAttempt { public Set getBlacklistedNodes() { return this.appSchedulingInfo.getBlackListCopy(); } + + @Private + public boolean hasPendingResourceRequest(ResourceCalculator rc, + String nodePartition, Resource cluster, + SchedulingMode schedulingMode) { + return SchedulerUtils.hasPendingResourceRequest(rc, + this.attemptResourceUsage, nodePartition, cluster, + schedulingMode); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index 248cc08..7a1a528 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -37,11 +37,10 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.collect.Sets; - /** * Utilities shared by schedulers. */ @@ -235,9 +234,13 @@ public class SchedulerUtils { if (labelExp == null && queueInfo != null && ResourceRequest.ANY.equals(resReq.getResourceName())) { labelExp = queueInfo.getDefaultNodeLabelExpression(); - resReq.setNodeLabelExpression(labelExp); } + // If labelExp still equals to null, set it to be NO_LABEL + resReq + .setNodeLabelExpression(labelExp == null ? RMNodeLabelsManager.NO_LABEL + : labelExp); + // we don't allow specify label expression other than resourceName=ANY now if (!ResourceRequest.ANY.equals(resReq.getResourceName()) && labelExp != null && !labelExp.trim().isEmpty()) { @@ -273,25 +276,6 @@ public class SchedulerUtils { } } - public static boolean checkQueueAccessToNode(Set queueLabels, - Set nodeLabels) { - // if queue's label is *, it can access any node - if (queueLabels != null && queueLabels.contains(RMNodeLabelsManager.ANY)) { - return true; - } - // any queue can access to a node without label - if (nodeLabels == null || nodeLabels.isEmpty()) { - return true; - } - // a queue can access to a node only if it contains any label of the node - if (queueLabels != null - && Sets.intersection(queueLabels, nodeLabels).size() > 0) { - return true; - } - // sorry, you cannot access - return false; - } - public static void checkIfLabelInClusterNodeLabels(RMNodeLabelsManager mgr, Set labels) throws IOException { if (mgr == null) { @@ -311,26 +295,6 @@ public class SchedulerUtils { } } } - - public static boolean checkNodeLabelExpression(Set nodeLabels, - String labelExpression) { - // empty label expression can only allocate on node with empty labels - if (labelExpression == null || labelExpression.trim().isEmpty()) { - if (!nodeLabels.isEmpty()) { - return false; - } - } - - if (labelExpression != null) { - for (String str : labelExpression.split("&&")) { - if (!str.trim().isEmpty() - && (nodeLabels == null || !nodeLabels.contains(str.trim()))) { - return false; - } - } - } - return true; - } public static boolean checkQueueLabelExpression(Set queueLabels, String labelExpression) { @@ -360,4 +324,43 @@ public class SchedulerUtils { } return null; } + + public static boolean checkResourceRequestMatchingNodePartition( + ResourceRequest offswitchResourceRequest, String nodePartition, + SchedulingMode schedulingMode) { + // We will only look at node label = nodeLabelToLookAt according to + // schedulingMode and partition of node. + String nodePartitionToLookAt = null; + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { + nodePartitionToLookAt = nodePartition; + } else { + nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL; + } + + String askedNodePartition = offswitchResourceRequest.getNodeLabelExpression(); + if (null == askedNodePartition) { + askedNodePartition = RMNodeLabelsManager.NO_LABEL; + } + return askedNodePartition.equals(nodePartitionToLookAt); + } + + private static boolean hasPendingResourceRequest(ResourceCalculator rc, + ResourceUsage usage, String partitionToLookAt, Resource cluster) { + if (Resources.greaterThan(rc, cluster, + usage.getPending(partitionToLookAt), Resources.none())) { + return true; + } + return false; + } + + @Private + public static boolean hasPendingResourceRequest(ResourceCalculator rc, + ResourceUsage usage, String nodePartition, Resource cluster, + SchedulingMode schedulingMode) { + String partitionToLookAt = nodePartition; + if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { + partitionToLookAt = RMNodeLabelsManager.NO_LABEL; + } + return hasPendingResourceRequest(rc, usage, partitionToLookAt, cluster); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 42ea089..d95c45c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -38,12 +37,12 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; @@ -56,6 +55,11 @@ import com.google.common.collect.Sets; public abstract class AbstractCSQueue implements CSQueue { private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class); + static final CSAssignment NULL_ASSIGNMENT = + new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); + + static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true); + CSQueue parent; final String queueName; volatile int numContainers; @@ -343,16 +347,8 @@ public abstract class AbstractCSQueue implements CSQueue { } synchronized void allocateResource(Resource clusterResource, - Resource resource, Set nodeLabels) { - - // Update usedResources by labels - if (nodeLabels == null || nodeLabels.isEmpty()) { - queueUsage.incUsed(resource); - } else { - for (String label : Sets.intersection(accessibleLabels, nodeLabels)) { - queueUsage.incUsed(label, resource); - } - } + Resource resource, String nodePartition) { + queueUsage.incUsed(nodePartition, resource); ++numContainers; CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(), @@ -360,15 +356,8 @@ public abstract class AbstractCSQueue implements CSQueue { } protected synchronized void releaseResource(Resource clusterResource, - Resource resource, Set nodeLabels) { - // Update usedResources by labels - if (null == nodeLabels || nodeLabels.isEmpty()) { - queueUsage.decUsed(resource); - } else { - for (String label : Sets.intersection(accessibleLabels, nodeLabels)) { - queueUsage.decUsed(label, resource); - } - } + Resource resource, String nodePartition) { + queueUsage.decUsed(nodePartition, resource); CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(), clusterResource, minimumAllocation); @@ -434,103 +423,108 @@ public abstract class AbstractCSQueue implements CSQueue { parentQ.getPreemptionDisabled()); } - private Resource getCurrentLimitResource(String nodeLabel, - Resource clusterResource, ResourceLimits currentResourceLimits) { - /* - * Current limit resource: For labeled resource: limit = queue-max-resource - * (TODO, this part need update when we support labeled-limit) For - * non-labeled resource: limit = min(queue-max-resource, - * limit-set-by-parent) - */ - Resource queueMaxResource = - Resources.multiplyAndNormalizeDown(resourceCalculator, - labelManager.getResourceByLabel(nodeLabel, clusterResource), - queueCapacities.getAbsoluteMaximumCapacity(nodeLabel), minimumAllocation); - if (nodeLabel.equals(RMNodeLabelsManager.NO_LABEL)) { - return Resources.min(resourceCalculator, clusterResource, - queueMaxResource, currentResourceLimits.getLimit()); + private Resource getCurrentLimitResource(String nodePartition, + Resource clusterResource, ResourceLimits currentResourceLimits, + SchedulingMode schedulingMode) { + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { + /* + * Current limit resource: For labeled resource: limit = queue-max-resource + * (TODO, this part need update when we support labeled-limit) For + * non-labeled resource: limit = min(queue-max-resource, + * limit-set-by-parent) + */ + Resource queueMaxResource = + Resources.multiplyAndNormalizeDown(resourceCalculator, + labelManager.getResourceByLabel(nodePartition, clusterResource), + queueCapacities.getAbsoluteMaximumCapacity(nodePartition), minimumAllocation); + if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { + return Resources.min(resourceCalculator, clusterResource, + queueMaxResource, currentResourceLimits.getLimit()); + } + return queueMaxResource; + } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { + // When we doing non-exclusive resource allocation, maximum capacity of + // all queues on this label equals to total resource with the label. + return labelManager.getResourceByLabel(nodePartition, clusterResource); } - return queueMaxResource; + + return Resources.none(); } synchronized boolean canAssignToThisQueue(Resource clusterResource, - Set nodeLabels, ResourceLimits currentResourceLimits, - Resource nowRequired, Resource resourceCouldBeUnreserved) { - // Get label of this queue can access, it's (nodeLabel AND queueLabel) - Set labelCanAccess; - if (null == nodeLabels || nodeLabels.isEmpty()) { - labelCanAccess = new HashSet(); - // Any queue can always access any node without label - labelCanAccess.add(RMNodeLabelsManager.NO_LABEL); - } else { - labelCanAccess = new HashSet( - accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels - : Sets.intersection(accessibleLabels, nodeLabels)); - } - - for (String label : labelCanAccess) { - // New total resource = used + required - Resource newTotalResource = - Resources.add(queueUsage.getUsed(label), nowRequired); - - Resource currentLimitResource = - getCurrentLimitResource(label, clusterResource, currentResourceLimits); - - // if reservation continous looking enabled, check to see if could we - // potentially use this node instead of a reserved node if the application - // has reserved containers. - // TODO, now only consider reservation cases when the node has no label - if (this.reservationsContinueLooking - && label.equals(RMNodeLabelsManager.NO_LABEL) - && Resources.greaterThan(resourceCalculator, clusterResource, - resourceCouldBeUnreserved, Resources.none())) { - // resource-without-reserved = used - reserved - Resource newTotalWithoutReservedResource = - Resources.subtract(newTotalResource, resourceCouldBeUnreserved); - - // when total-used-without-reserved-resource < currentLimit, we still - // have chance to allocate on this node by unreserving some containers - if (Resources.lessThan(resourceCalculator, clusterResource, - newTotalWithoutReservedResource, currentLimitResource)) { - if (LOG.isDebugEnabled()) { - LOG.debug("try to use reserved: " + getQueueName() - + " usedResources: " + queueUsage.getUsed() - + ", clusterResources: " + clusterResource - + ", reservedResources: " + resourceCouldBeUnreserved - + ", capacity-without-reserved: " - + newTotalWithoutReservedResource + ", maxLimitCapacity: " - + currentLimitResource); - } - return true; + String nodePartition, ResourceLimits currentResourceLimits, + Resource nowRequired, Resource resourceCouldBeUnreserved, + SchedulingMode schedulingMode) { + // New total resource = used + required + Resource newTotalResource = + Resources.add(queueUsage.getUsed(nodePartition), nowRequired); + + // Get current limited resource: + // - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect + // queues' max capacity. + // - When doing IGNORE_PARTITION_EXCLUSIVITY allocation, we will not respect + // queue's max capacity, queue's max capacity on the partition will be + // considered to be 100%. Which is a queue can use all resource in the + // partition. + // Doing this because: for non-exclusive allocation, we make sure there's + // idle resource on the partition, to avoid wastage, such resource will be + // leveraged as much as we can, and preemption policy will reclaim it back + // when partitoned-resource-request comes back. + Resource currentLimitResource = + getCurrentLimitResource(nodePartition, clusterResource, + currentResourceLimits, schedulingMode); + + // if reservation continous looking enabled, check to see if could we + // potentially use this node instead of a reserved node if the application + // has reserved containers. + // TODO, now only consider reservation cases when the node has no label + if (this.reservationsContinueLooking + && nodePartition.equals(RMNodeLabelsManager.NO_LABEL) + && Resources.greaterThan(resourceCalculator, clusterResource, + resourceCouldBeUnreserved, Resources.none())) { + // resource-without-reserved = used - reserved + Resource newTotalWithoutReservedResource = + Resources.subtract(newTotalResource, resourceCouldBeUnreserved); + + // when total-used-without-reserved-resource < currentLimit, we still + // have chance to allocate on this node by unreserving some containers + if (Resources.lessThan(resourceCalculator, clusterResource, + newTotalWithoutReservedResource, currentLimitResource)) { + if (LOG.isDebugEnabled()) { + LOG.debug("try to use reserved: " + getQueueName() + + " usedResources: " + queueUsage.getUsed() + + ", clusterResources: " + clusterResource + + ", reservedResources: " + resourceCouldBeUnreserved + + ", capacity-without-reserved: " + + newTotalWithoutReservedResource + ", maxLimitCapacity: " + + currentLimitResource); } + return true; } - - // Otherwise, if any of the label of this node beyond queue limit, we - // cannot allocate on this node. Consider a small epsilon here. - if (Resources.greaterThan(resourceCalculator, clusterResource, - newTotalResource, currentLimitResource)) { - return false; - } + } - if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() - + "Check assign to queue, label=" + label - + " usedResources: " + queueUsage.getUsed(label) - + " clusterResources: " + clusterResource - + " currentUsedCapacity " - + Resources.divide(resourceCalculator, clusterResource, - queueUsage.getUsed(label), - labelManager.getResourceByLabel(label, clusterResource)) - + " max-capacity: " - + queueCapacities.getAbsoluteMaximumCapacity(label) - + ")"); - } - return true; + // Check if we over current-resource-limit computed. + if (Resources.greaterThan(resourceCalculator, clusterResource, + newTotalResource, currentLimitResource)) { + return false; } - - // Actually, this will not happen, since labelCanAccess will be always - // non-empty - return false; + + if (LOG.isDebugEnabled()) { + LOG.debug(getQueueName() + + "Check assign to queue, nodePartition=" + + nodePartition + + " usedResources: " + + queueUsage.getUsed(nodePartition) + + " clusterResources: " + + clusterResource + + " currentUsedCapacity " + + Resources.divide(resourceCalculator, clusterResource, + queueUsage.getUsed(nodePartition), + labelManager.getResourceByLabel(nodePartition, clusterResource)) + + " max-capacity: " + + queueCapacities.getAbsoluteMaximumCapacity(nodePartition) + ")"); + } + return true; } @Override @@ -556,4 +550,33 @@ public abstract class AbstractCSQueue implements CSQueue { parent.decPendingResource(nodeLabel, resourceToDec); } } + + /** + * Return if the queue has pending resource on given nodePartition and + * schedulingMode. + */ + boolean hasPendingResourceRequest(String nodePartition, + Resource cluster, SchedulingMode schedulingMode) { + return SchedulerUtils.hasPendingResourceRequest(resourceCalculator, + queueUsage, nodePartition, cluster, schedulingMode); + } + + boolean accessibleToPartition(String nodePartition) { + // if queue's label is *, it can access any node + if (accessibleLabels != null + && accessibleLabels.contains(RMNodeLabelsManager.ANY)) { + return true; + } + // any queue can access to a node without label + if (nodePartition == null + || nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { + return true; + } + // a queue can access to a node only if it contains any label of the node + if (accessibleLabels != null && accessibleLabels.contains(nodePartition)) { + return true; + } + // sorry, you cannot access + return false; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 1a9448a..b06a646 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -190,10 +190,13 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { * @param clusterResource the resource of the cluster. * @param node node on which resources are available * @param resourceLimits how much overall resource of this queue can use. + * @param schedulingMode Type of exclusive check when assign container on a + * NodeManager, see {@link SchedulingMode}. * @return the assignment */ public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, ResourceLimits resourceLimits); + FiCaSchedulerNode node, ResourceLimits resourceLimits, + SchedulingMode schedulingMode); /** * A container assigned to the queue has completed. http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e93c529..cfeee37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; @@ -1114,28 +1115,30 @@ public class CapacityScheduler extends if (reservedContainer != null) { FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer(reservedContainer.getContainerId()); - + // Try to fulfill the reservation - LOG.info("Trying to fulfill reservation for application " + - reservedApplication.getApplicationId() + " on node: " + - node.getNodeID()); - - LeafQueue queue = ((LeafQueue)reservedApplication.getQueue()); - assignment = queue.assignContainers( + LOG.info("Trying to fulfill reservation for application " + + reservedApplication.getApplicationId() + " on node: " + + node.getNodeID()); + + LeafQueue queue = ((LeafQueue) reservedApplication.getQueue()); + assignment = + queue.assignContainers( clusterResource, node, // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager.getResourceByLabel( - RMNodeLabelsManager.NO_LABEL, clusterResource))); + RMNodeLabelsManager.NO_LABEL, clusterResource)), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); if (assignment.isFulfilledReservation()) { CSAssignment tmp = new CSAssignment(reservedContainer.getReservedResource(), - assignment.getType()); + assignment.getType()); Resources.addTo(assignment.getAssignmentInformation().getAllocated(), - reservedContainer.getReservedResource()); + reservedContainer.getReservedResource()); tmp.getAssignmentInformation().addAllocationDetails( - reservedContainer.getContainerId(), queue.getQueuePath()); + reservedContainer.getContainerId(), queue.getQueuePath()); tmp.getAssignmentInformation().incrAllocations(); updateSchedulerHealth(lastNodeUpdateTime, node, tmp); schedulerHealth.updateSchedulerFulfilledReservationCounts(1); @@ -1143,16 +1146,13 @@ public class CapacityScheduler extends RMContainer excessReservation = assignment.getExcessReservation(); if (excessReservation != null) { - Container container = excessReservation.getContainer(); - queue.completedContainer( - clusterResource, assignment.getApplication(), node, - excessReservation, - SchedulerUtils.createAbnormalContainerStatus( - container.getId(), - SchedulerUtils.UNRESERVED_CONTAINER), - RMContainerEventType.RELEASED, null, true); + Container container = excessReservation.getContainer(); + queue.completedContainer(clusterResource, assignment.getApplication(), + node, excessReservation, SchedulerUtils + .createAbnormalContainerStatus(container.getId(), + SchedulerUtils.UNRESERVED_CONTAINER), + RMContainerEventType.RELEASED, null, true); } - } // Try to schedule more if there are no reservations to fulfill @@ -1163,22 +1163,61 @@ public class CapacityScheduler extends LOG.debug("Trying to schedule on node: " + node.getNodeName() + ", available: " + node.getAvailableResource()); } + assignment = root.assignContainers( clusterResource, node, // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager.getResourceByLabel( - RMNodeLabelsManager.NO_LABEL, clusterResource))); + RMNodeLabelsManager.NO_LABEL, clusterResource)), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + if (Resources.greaterThan(calculator, clusterResource, + assignment.getResource(), Resources.none())) { + updateSchedulerHealth(lastNodeUpdateTime, node, assignment); + return; + } + + // Only do non-exclusive allocation when node has node-labels. + if (StringUtils.equals(node.getPartition(), + RMNodeLabelsManager.NO_LABEL)) { + return; + } + + // Only do non-exclusive allocation when the node-label supports that + try { + if (rmContext.getNodeLabelManager().isExclusiveNodeLabel( + node.getPartition())) { + return; + } + } catch (IOException e) { + LOG.warn("Exception when trying to get exclusivity of node label=" + + node.getPartition(), e); + return; + } + + // Try to use NON_EXCLUSIVE + assignment = root.assignContainers( + clusterResource, + node, + // TODO, now we only consider limits for parent for non-labeled + // resources, should consider labeled resources as well. + new ResourceLimits(labelManager.getResourceByLabel( + RMNodeLabelsManager.NO_LABEL, clusterResource)), + SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); updateSchedulerHealth(lastNodeUpdateTime, node, assignment); + if (Resources.greaterThan(calculator, clusterResource, + assignment.getResource(), Resources.none())) { + return; + } } } else { - LOG.info("Skipping scheduling since node " + node.getNodeID() + - " is reserved by application " + - node.getReservedContainer().getContainerId().getApplicationAttemptId() - ); + LOG.info("Skipping scheduling since node " + + node.getNodeID() + + " is reserved by application " + + node.getReservedContainer().getContainerId() + .getApplicationAttemptId()); } - } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 102e553..4e8d617 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -319,6 +319,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur getMaximumApplicationMasterResourcePercent()); } + public void setMaximumApplicationMasterResourcePerQueuePercent(String queue, + float percent) { + setFloat(getQueuePrefix(queue) + MAXIMUM_AM_RESOURCE_SUFFIX, percent); + } + public float getNonLabeledQueueCapacity(String queue) { float capacity = queue.equals("root") ? 100.0f : getFloat( getQueuePrefix(queue) + CAPACITY, UNDEFINED); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 59a016f..8a6a601 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -24,7 +24,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -58,6 +57,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; @@ -718,39 +718,11 @@ public class LeafQueue extends AbstractCSQueue { ApplicationAttemptId applicationAttemptId) { return applicationAttemptMap.get(applicationAttemptId); } - - private static final CSAssignment NULL_ASSIGNMENT = - new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - - private static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true); - - private static Set getRequestLabelSetByExpression( - String labelExpression) { - Set labels = new HashSet(); - if (null == labelExpression) { - return labels; - } - for (String l : labelExpression.split("&&")) { - if (l.trim().isEmpty()) { - continue; - } - labels.add(l.trim()); - } - return labels; - } - - private boolean checkResourceRequestMatchingNodeLabel(ResourceRequest offswitchResourceRequest, - FiCaSchedulerNode node) { - String askedNodeLabel = offswitchResourceRequest.getNodeLabelExpression(); - if (null == askedNodeLabel) { - askedNodeLabel = RMNodeLabelsManager.NO_LABEL; - } - return askedNodeLabel.equals(node.getPartition()); - } @Override public synchronized CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, ResourceLimits currentResourceLimits) { + FiCaSchedulerNode node, ResourceLimits currentResourceLimits, + SchedulingMode schedulingMode) { updateCurrentResourceLimits(currentResourceLimits, clusterResource); if(LOG.isDebugEnabled()) { @@ -758,12 +730,6 @@ public class LeafQueue extends AbstractCSQueue { + " #applications=" + activeApplications.size()); } - // if our queue cannot access this node, just return - if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, - node.getLabels())) { - return NULL_ASSIGNMENT; - } - // Check for reserved resources RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { @@ -771,8 +737,26 @@ public class LeafQueue extends AbstractCSQueue { getApplication(reservedContainer.getApplicationAttemptId()); synchronized (application) { return assignReservedContainer(application, node, reservedContainer, - clusterResource); + clusterResource, schedulingMode); + } + } + + // if our queue cannot access this node, just return + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY + && !accessibleToPartition(node.getPartition())) { + return NULL_ASSIGNMENT; + } + + // Check if this queue need more resource, simply skip allocation if this + // queue doesn't need more resources. + if (!hasPendingResourceRequest(node.getPartition(), + clusterResource, schedulingMode)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip this queue=" + getQueuePath() + + ", because it doesn't need more resource, schedulingMode=" + + schedulingMode.name() + " node-partition=" + node.getPartition()); } + return NULL_ASSIGNMENT; } // Try to assign containers to applications in order @@ -783,6 +767,17 @@ public class LeafQueue extends AbstractCSQueue { + application.getApplicationId()); application.showRequests(); } + + // Check if application needs more resource, skip if it doesn't need more. + if (!application.hasPendingResourceRequest(resourceCalculator, + node.getPartition(), clusterResource, schedulingMode)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId() + + ", because it doesn't need more resource, schedulingMode=" + + schedulingMode.name() + " node-label=" + node.getPartition()); + } + continue; + } synchronized (application) { // Check if this resource is on the blacklist @@ -806,10 +801,27 @@ public class LeafQueue extends AbstractCSQueue { continue; } + // AM container allocation doesn't support non-exclusive allocation to + // avoid painful of preempt an AM container + if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { + RMAppAttempt rmAppAttempt = + csContext.getRMContext().getRMApps() + .get(application.getApplicationId()).getCurrentAppAttempt(); + if (null == rmAppAttempt.getMasterContainer()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip allocating AM container to app_attempt=" + + application.getApplicationAttemptId() + + ", don't allow to allocate AM container in non-exclusive mode"); + } + break; + } + } + // Is the node-label-expression of this offswitch resource request // matches the node's label? // If not match, jump to next priority. - if (!checkResourceRequestMatchingNodeLabel(anyRequest, node)) { + if (!SchedulerUtils.checkResourceRequestMatchingNodePartition( + anyRequest, node.getPartition(), schedulingMode)) { continue; } @@ -822,10 +834,6 @@ public class LeafQueue extends AbstractCSQueue { } } - Set requestedNodeLabels = - getRequestLabelSetByExpression(anyRequest - .getNodeLabelExpression()); - // Compute user-limit & set headroom // Note: We compute both user-limit & headroom with the highest // priority request as the target. @@ -833,27 +841,61 @@ public class LeafQueue extends AbstractCSQueue { // before all higher priority ones are serviced. Resource userLimit = computeUserLimitAndSetHeadroom(application, clusterResource, - required, requestedNodeLabels); + required, node.getPartition(), schedulingMode); // Check queue max-capacity limit - if (!super.canAssignToThisQueue(clusterResource, node.getLabels(), - this.currentResourceLimits, required, application.getCurrentReservation())) { + if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), + this.currentResourceLimits, required, + application.getCurrentReservation(), schedulingMode)) { return NULL_ASSIGNMENT; } // Check user limit if (!canAssignToUser(clusterResource, application.getUser(), userLimit, - application, true, requestedNodeLabels)) { + application, true, node.getPartition())) { break; } // Inform the application it is about to get a scheduling opportunity application.addSchedulingOpportunity(priority); + // Increase missed-non-partitioned-resource-request-opportunity. + // This is to make sure non-partitioned-resource-request will prefer + // to be allocated to non-partitioned nodes + int missedNonPartitionedRequestSchedulingOpportunity = 0; + if (anyRequest.getNodeLabelExpression().equals( + RMNodeLabelsManager.NO_LABEL)) { + missedNonPartitionedRequestSchedulingOpportunity = + application + .addMissedNonPartitionedRequestSchedulingOpportunity(priority); + } + + if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { + // Before doing allocation, we need to check scheduling opportunity to + // make sure : non-partitioned resource request should be scheduled to + // non-partitioned partition first. + if (missedNonPartitionedRequestSchedulingOpportunity < scheduler + .getNumClusterNodes()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip app_attempt=" + + application.getApplicationAttemptId() + + " priority=" + + priority + + " because missed-non-partitioned-resource-request" + + " opportunity under requred:" + + " Now=" + missedNonPartitionedRequestSchedulingOpportunity + + " required=" + + scheduler.getNumClusterNodes()); + } + + break; + } + } + // Try to schedule CSAssignment assignment = assignContainersOnNode(clusterResource, node, application, priority, - null); + null, schedulingMode); // Did the application skip this node? if (assignment.getSkipped()) { @@ -870,9 +912,9 @@ public class LeafQueue extends AbstractCSQueue { // Book-keeping // Note: Update headroom to account for current allocation too... allocateResource(clusterResource, application, assigned, - node.getLabels()); + node.getPartition()); - // Don't reset scheduling opportunities for non-local assignments + // Don't reset scheduling opportunities for offswitch assignments // otherwise the app will be delayed for each non-local assignment. // This helps apps with many off-cluster requests schedule faster. if (assignment.getType() != NodeType.OFF_SWITCH) { @@ -881,6 +923,10 @@ public class LeafQueue extends AbstractCSQueue { } application.resetSchedulingOpportunities(priority); } + // Non-exclusive scheduling opportunity is different: we need reset + // it every time to make sure non-labeled resource request will be + // most likely allocated on non-labeled nodes first. + application.resetMissedNonPartitionedRequestSchedulingOpportunity(priority); // Done return assignment; @@ -904,7 +950,8 @@ public class LeafQueue extends AbstractCSQueue { private synchronized CSAssignment assignReservedContainer( FiCaSchedulerApp application, FiCaSchedulerNode node, - RMContainer rmContainer, Resource clusterResource) { + RMContainer rmContainer, Resource clusterResource, + SchedulingMode schedulingMode) { // Do we still need this reservation? Priority priority = rmContainer.getReservedPriority(); if (application.getTotalRequiredResources(priority) == 0) { @@ -915,7 +962,7 @@ public class LeafQueue extends AbstractCSQueue { // Try to assign if we have sufficient resources CSAssignment tmp = assignContainersOnNode(clusterResource, node, application, priority, - rmContainer); + rmContainer, schedulingMode); // Doesn't matter... since it's already charged for at time of reservation // "re-reservation" is *free* @@ -929,7 +976,8 @@ public class LeafQueue extends AbstractCSQueue { protected Resource getHeadroom(User user, Resource queueCurrentLimit, Resource clusterResource, FiCaSchedulerApp application, Resource required) { return getHeadroom(user, queueCurrentLimit, clusterResource, - computeUserLimit(application, clusterResource, required, user, null)); + computeUserLimit(application, clusterResource, required, user, + RMNodeLabelsManager.NO_LABEL, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); } private Resource getHeadroom(User user, Resource currentResourceLimit, @@ -973,7 +1021,8 @@ public class LeafQueue extends AbstractCSQueue { @Lock({LeafQueue.class, FiCaSchedulerApp.class}) Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, - Resource clusterResource, Resource required, Set requestedLabels) { + Resource clusterResource, Resource required, String nodePartition, + SchedulingMode schedulingMode) { String user = application.getUser(); User queueUser = getUser(user); @@ -981,7 +1030,7 @@ public class LeafQueue extends AbstractCSQueue { // TODO, need consider headroom respect labels also Resource userLimit = computeUserLimit(application, clusterResource, required, - queueUser, requestedLabels); + queueUser, nodePartition, schedulingMode); setQueueResourceLimitsInfo(clusterResource); @@ -1010,34 +1059,18 @@ public class LeafQueue extends AbstractCSQueue { @Lock(NoLock.class) private Resource computeUserLimit(FiCaSchedulerApp application, Resource clusterResource, Resource required, User user, - Set requestedLabels) { + String nodePartition, SchedulingMode schedulingMode) { // What is our current capacity? // * It is equal to the max(required, queue-capacity) if // we're running below capacity. The 'max' ensures that jobs in queues // with miniscule capacity (< 1 slot) make progress // * If we're running over capacity, then its // (usedResources + required) (which extra resources we are allocating) - Resource queueCapacity = Resource.newInstance(0, 0); - if (requestedLabels != null && !requestedLabels.isEmpty()) { - // if we have multiple labels to request, we will choose to use the first - // label - String firstLabel = requestedLabels.iterator().next(); - queueCapacity = - Resources - .max(resourceCalculator, clusterResource, queueCapacity, - Resources.multiplyAndNormalizeUp(resourceCalculator, - labelManager.getResourceByLabel(firstLabel, - clusterResource), - queueCapacities.getAbsoluteCapacity(firstLabel), - minimumAllocation)); - } else { - // else there's no label on request, just to use absolute capacity as - // capacity for nodes without label - queueCapacity = - Resources.multiplyAndNormalizeUp(resourceCalculator, labelManager - .getResourceByLabel(CommonNodeLabelsManager.NO_LABEL, clusterResource), - queueCapacities.getAbsoluteCapacity(), minimumAllocation); - } + Resource queueCapacity = + Resources.multiplyAndNormalizeUp(resourceCalculator, + labelManager.getResourceByLabel(nodePartition, clusterResource), + queueCapacities.getAbsoluteCapacity(nodePartition), + minimumAllocation); // Allow progress for queues with miniscule capacity queueCapacity = @@ -1047,33 +1080,56 @@ public class LeafQueue extends AbstractCSQueue { required); Resource currentCapacity = - Resources.lessThan(resourceCalculator, clusterResource, - queueUsage.getUsed(), queueCapacity) ? - queueCapacity : Resources.add(queueUsage.getUsed(), required); + Resources.lessThan(resourceCalculator, clusterResource, + queueUsage.getUsed(nodePartition), queueCapacity) ? queueCapacity + : Resources.add(queueUsage.getUsed(nodePartition), required); // Never allow a single user to take more than the // queue's configured capacity * user-limit-factor. // Also, the queue's configured capacity should be higher than // queue-hard-limit * ulMin - final int activeUsers = activeUsersManager.getNumActiveUsers(); - - Resource limit = + final int activeUsers = activeUsersManager.getNumActiveUsers(); + + // User limit resource is determined by: + // max{currentCapacity / #activeUsers, currentCapacity * user-limit-percentage%) + Resource userLimitResource = Resources.max( + resourceCalculator, clusterResource, + Resources.divideAndCeil( + resourceCalculator, currentCapacity, activeUsers), + Resources.divideAndCeil( + resourceCalculator, + Resources.multiplyAndRoundDown( + currentCapacity, userLimit), + 100) + ); + + // User limit is capped by maxUserLimit + // - maxUserLimit = queueCapacity * user-limit-factor (RESPECT_PARTITION_EXCLUSIVITY) + // - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY) + // + // In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a + // partition, its guaranteed resource on that partition is 0. And + // user-limit-factor computation is based on queue's guaranteed capacity. So + // we will not cap user-limit as well as used resource when doing + // IGNORE_PARTITION_EXCLUSIVITY allocation. + Resource maxUserLimit = Resources.none(); + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { + maxUserLimit = + Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor); + } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { + maxUserLimit = + labelManager.getResourceByLabel(nodePartition, clusterResource); + } + + // Cap final user limit with maxUserLimit + userLimitResource = Resources.roundUp( resourceCalculator, Resources.min( resourceCalculator, clusterResource, - Resources.max( - resourceCalculator, clusterResource, - Resources.divideAndCeil( - resourceCalculator, currentCapacity, activeUsers), - Resources.divideAndCeil( - resourceCalculator, - Resources.multiplyAndRoundDown( - currentCapacity, userLimit), - 100) - ), - Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor) + userLimitResource, + maxUserLimit ), minimumAllocation); @@ -1081,11 +1137,11 @@ public class LeafQueue extends AbstractCSQueue { String userName = application.getUser(); LOG.debug("User limit computation for " + userName + " in queue " + getQueueName() + - " userLimit=" + userLimit + + " userLimitPercent=" + userLimit + " userLimitFactor=" + userLimitFactor + " required: " + required + " consumed: " + user.getUsed() + - " limit: " + limit + + " user-limit-resource: " + userLimitResource + " queueCapacity: " + queueCapacity + " qconsumed: " + queueUsage.getUsed() + " currentCapacity: " + currentCapacity + @@ -1093,31 +1149,26 @@ public class LeafQueue extends AbstractCSQueue { " clusterCapacity: " + clusterResource ); } - user.setUserResourceLimit(limit); - return limit; + user.setUserResourceLimit(userLimitResource); + return userLimitResource; } @Private protected synchronized boolean canAssignToUser(Resource clusterResource, String userName, Resource limit, FiCaSchedulerApp application, - boolean checkReservations, Set requestLabels) { + boolean checkReservations, String nodePartition) { User user = getUser(userName); - - String label = CommonNodeLabelsManager.NO_LABEL; - if (requestLabels != null && !requestLabels.isEmpty()) { - label = requestLabels.iterator().next(); - } // Note: We aren't considering the current request since there is a fixed // overhead of the AM, but it's a > check, not a >= check, so... if (Resources .greaterThan(resourceCalculator, clusterResource, - user.getUsed(label), + user.getUsed(nodePartition), limit)) { // if enabled, check to see if could we potentially use this node instead // of a reserved node if the application has reserved containers if (this.reservationsContinueLooking && checkReservations - && label.equals(CommonNodeLabelsManager.NO_LABEL)) { + && nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) { if (Resources.lessThanOrEqual( resourceCalculator, clusterResource, @@ -1136,7 +1187,7 @@ public class LeafQueue extends AbstractCSQueue { if (LOG.isDebugEnabled()) { LOG.debug("User " + userName + " in queue " + getQueueName() + " will exceed limit - " + " consumed: " - + user.getUsed() + " limit: " + limit); + + user.getUsed(nodePartition) + " limit: " + limit); } return false; } @@ -1176,7 +1227,7 @@ public class LeafQueue extends AbstractCSQueue { private CSAssignment assignContainersOnNode(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer) { + RMContainer reservedContainer, SchedulingMode schedulingMode) { CSAssignment assigned; @@ -1190,7 +1241,7 @@ public class LeafQueue extends AbstractCSQueue { assigned = assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, node, application, priority, reservedContainer, - allocatedContainer); + allocatedContainer, schedulingMode); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned.getResource(), Resources.none())) { @@ -1219,7 +1270,7 @@ public class LeafQueue extends AbstractCSQueue { assigned = assignRackLocalContainers(clusterResource, rackLocalResourceRequest, node, application, priority, reservedContainer, - allocatedContainer); + allocatedContainer, schedulingMode); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned.getResource(), Resources.none())) { @@ -1248,7 +1299,7 @@ public class LeafQueue extends AbstractCSQueue { assigned = assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, node, application, priority, reservedContainer, - allocatedContainer); + allocatedContainer, schedulingMode); // update locality statistics if (allocatedContainer.getValue() != null) { @@ -1314,16 +1365,17 @@ public class LeafQueue extends AbstractCSQueue { @Private protected boolean checkLimitsToReserve(Resource clusterResource, - FiCaSchedulerApp application, Resource capability) { + FiCaSchedulerApp application, Resource capability, String nodePartition, + SchedulingMode schedulingMode) { // we can't reserve if we got here based on the limit // checks assuming we could unreserve!!! Resource userLimit = computeUserLimitAndSetHeadroom(application, - clusterResource, capability, null); + clusterResource, capability, nodePartition, schedulingMode); // Check queue max-capacity limit, // TODO: Consider reservation on labels - if (!canAssignToThisQueue(clusterResource, null, - this.currentResourceLimits, capability, Resources.none())) { + if (!canAssignToThisQueue(clusterResource, RMNodeLabelsManager.NO_LABEL, + this.currentResourceLimits, capability, Resources.none(), schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("was going to reserve but hit queue limit"); } @@ -1332,7 +1384,7 @@ public class LeafQueue extends AbstractCSQueue { // Check user limit if (!canAssignToUser(clusterResource, application.getUser(), userLimit, - application, false, null)) { + application, false, nodePartition)) { if (LOG.isDebugEnabled()) { LOG.debug("was going to reserve but hit user limit"); } @@ -1345,12 +1397,13 @@ public class LeafQueue extends AbstractCSQueue { private CSAssignment assignNodeLocalContainers(Resource clusterResource, ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer) { + RMContainer reservedContainer, MutableObject allocatedContainer, + SchedulingMode schedulingMode) { if (canAssign(application, priority, node, NodeType.NODE_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, - allocatedContainer); + allocatedContainer, schedulingMode); } return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); @@ -1359,12 +1412,13 @@ public class LeafQueue extends AbstractCSQueue { private CSAssignment assignRackLocalContainers(Resource clusterResource, ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer) { + RMContainer reservedContainer, MutableObject allocatedContainer, + SchedulingMode schedulingMode) { if (canAssign(application, priority, node, NodeType.RACK_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, - allocatedContainer); + allocatedContainer, schedulingMode); } return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL); @@ -1373,16 +1427,21 @@ public class LeafQueue extends AbstractCSQueue { private CSAssignment assignOffSwitchContainers(Resource clusterResource, ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer) { + RMContainer reservedContainer, MutableObject allocatedContainer, + SchedulingMode schedulingMode) { if (canAssign(application, priority, node, NodeType.OFF_SWITCH, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, - allocatedContainer); + allocatedContainer, schedulingMode); } return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH); } + + private int getActualNodeLocalityDelay() { + return Math.min(scheduler.getNumClusterNodes(), getNodeLocalityDelay()); + } boolean canAssign(FiCaSchedulerApp application, Priority priority, FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) { @@ -1417,10 +1476,7 @@ public class LeafQueue extends AbstractCSQueue { if (type == NodeType.RACK_LOCAL) { // 'Delay' rack-local just a little bit... long missedOpportunities = application.getSchedulingOpportunities(priority); - return ( - Math.min(scheduler.getNumClusterNodes(), getNodeLocalityDelay()) < - missedOpportunities - ); + return getActualNodeLocalityDelay() < missedOpportunities; } // Check if we need containers on this host @@ -1460,7 +1516,7 @@ public class LeafQueue extends AbstractCSQueue { private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, ResourceRequest request, NodeType type, RMContainer rmContainer, - MutableObject createdContainer) { + MutableObject createdContainer, SchedulingMode schedulingMode) { if (LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() + " application=" + application.getApplicationId() @@ -1469,9 +1525,8 @@ public class LeafQueue extends AbstractCSQueue { } // check if the resource request can access the label - if (!SchedulerUtils.checkNodeLabelExpression( - node.getLabels(), - request.getNodeLabelExpression())) { + if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request, + node.getPartition(), schedulingMode)) { // this is a reserved container, but we cannot allocate it now according // to label not match. This can be caused by node label changed // We should un-reserve this container. @@ -1576,8 +1631,8 @@ public class LeafQueue extends AbstractCSQueue { // If we're trying to reserve a container here, not container will be // unreserved for reserving the new one. Check limits again before // reserve the new container - if (!checkLimitsToReserve(clusterResource, - application, capability)) { + if (!checkLimitsToReserve(clusterResource, + application, capability, node.getPartition(), schedulingMode)) { return new CSAssignment(Resources.none(), type); } } @@ -1666,7 +1721,7 @@ public class LeafQueue extends AbstractCSQueue { // Book-keeping if (removed) { releaseResource(clusterResource, application, - container.getResource(), node.getLabels()); + container.getResource(), node.getPartition()); LOG.info("completedContainer" + " container=" + container + " queue=" + this + @@ -1684,13 +1739,13 @@ public class LeafQueue extends AbstractCSQueue { synchronized void allocateResource(Resource clusterResource, SchedulerApplicationAttempt application, Resource resource, - Set nodeLabels) { - super.allocateResource(clusterResource, resource, nodeLabels); + String nodePartition) { + super.allocateResource(clusterResource, resource, nodePartition); // Update user metrics String userName = application.getUser(); User user = getUser(userName); - user.assignContainer(resource, nodeLabels); + user.assignContainer(resource, nodePartition); // Note this is a bit unconventional since it gets the object and modifies // it here, rather then using set routine Resources.subtractFrom(application.getHeadroom(), resource); // headroom @@ -1707,13 +1762,13 @@ public class LeafQueue extends AbstractCSQueue { } synchronized void releaseResource(Resource clusterResource, - FiCaSchedulerApp application, Resource resource, Set nodeLabels) { - super.releaseResource(clusterResource, resource, nodeLabels); + FiCaSchedulerApp application, Resource resource, String nodePartition) { + super.releaseResource(clusterResource, resource, nodePartition); // Update user metrics String userName = application.getUser(); User user = getUser(userName); - user.releaseContainer(resource, nodeLabels); + user.releaseContainer(resource, nodePartition); metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); LOG.info(getQueueName() + @@ -1723,7 +1778,8 @@ public class LeafQueue extends AbstractCSQueue { private void updateAbsoluteCapacityResource(Resource clusterResource) { absoluteCapacityResource = - Resources.multiplyAndNormalizeUp(resourceCalculator, clusterResource, + Resources.multiplyAndNormalizeUp(resourceCalculator, labelManager + .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource), queueCapacities.getAbsoluteCapacity(), minimumAllocation); } @@ -1769,8 +1825,9 @@ public class LeafQueue extends AbstractCSQueue { // Update application properties for (FiCaSchedulerApp application : activeApplications) { synchronized (application) { - computeUserLimitAndSetHeadroom(application, clusterResource, - Resources.none(), null); + computeUserLimitAndSetHeadroom(application, clusterResource, + Resources.none(), RMNodeLabelsManager.NO_LABEL, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); } } } @@ -1828,25 +1885,12 @@ public class LeafQueue extends AbstractCSQueue { } } - public void assignContainer(Resource resource, - Set nodeLabels) { - if (nodeLabels == null || nodeLabels.isEmpty()) { - userResourceUsage.incUsed(resource); - } else { - for (String label : nodeLabels) { - userResourceUsage.incUsed(label, resource); - } - } + public void assignContainer(Resource resource, String nodePartition) { + userResourceUsage.incUsed(nodePartition, resource); } - public void releaseContainer(Resource resource, Set nodeLabels) { - if (nodeLabels == null || nodeLabels.isEmpty()) { - userResourceUsage.decUsed(resource); - } else { - for (String label : nodeLabels) { - userResourceUsage.decUsed(label, resource); - } - } + public void releaseContainer(Resource resource, String nodePartition) { + userResourceUsage.decUsed(nodePartition, resource); } public Resource getUserResourceLimit() { @@ -1869,7 +1913,7 @@ public class LeafQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, attempt, rmContainer.getContainer() - .getResource(), node.getLabels()); + .getResource(), node.getPartition()); } getParent().recoverContainer(clusterResource, attempt, rmContainer); } @@ -1909,7 +1953,7 @@ public class LeafQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, application, rmContainer.getContainer() - .getResource(), node.getLabels()); + .getResource(), node.getPartition()); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + " resource=" + rmContainer.getContainer().getResource() + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() @@ -1927,7 +1971,7 @@ public class LeafQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); releaseResource(clusterResource, application, rmContainer.getContainer() - .getResource(), node.getLabels()); + .getResource(), node.getPartition()); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + " resource=" + rmContainer.getContainer().getResource() + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()