Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 03EB218280 for ; Wed, 5 Aug 2015 22:40:41 +0000 (UTC) Received: (qmail 16515 invoked by uid 500); 5 Aug 2015 22:40:40 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 16380 invoked by uid 500); 5 Aug 2015 22:40:40 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 15070 invoked by uid 99); 5 Aug 2015 22:40:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Aug 2015 22:40:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CF3B9E0501; Wed, 5 Aug 2015 22:40:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aw@apache.org To: common-commits@hadoop.apache.org Date: Wed, 05 Aug 2015 22:40:49 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [11/13] hadoop git commit: YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend container allocation logic. Contributed by Wangda Tan YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend container allocation logic. Contributed by Wangda Tan Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ba2313d6 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ba2313d6 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ba2313d6 Branch: refs/heads/HADOOP-12111 Commit: ba2313d6145a1234777938a747187373f4cd58d9 Parents: f271d37 Author: Jian He Authored: Wed Aug 5 13:45:17 2015 -0700 Committer: Jian He Committed: Wed Aug 5 13:47:40 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/capacity/AbstractCSQueue.java | 8 +- .../scheduler/capacity/CSAssignment.java | 11 +- .../scheduler/capacity/LeafQueue.java | 10 +- .../scheduler/capacity/ParentQueue.java | 4 +- .../capacity/allocator/AllocationState.java | 28 + .../capacity/allocator/ContainerAllocation.java | 76 +++ .../capacity/allocator/ContainerAllocator.java | 115 ++++ .../allocator/RegularContainerAllocator.java | 629 ++++++++++++++++++ .../scheduler/common/fica/FiCaSchedulerApp.java | 658 ++----------------- .../scheduler/capacity/TestLeafQueue.java | 161 ++--- 11 files changed, 1011 insertions(+), 692 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ba023a5..cff7f6b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -391,6 +391,9 @@ Release 2.8.0 - UNRELEASED YARN-2768. Avoid cloning Resource in FSAppAttempt#updateDemand. (Hong Zhiguo via kasha) + YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend + container allocation logic. (Wangda Tan via jianhe) + BUG FIXES YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index dcc4205..134b941 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -53,13 +53,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.collect.Sets; public abstract class AbstractCSQueue implements CSQueue { - private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class); - - static final CSAssignment NULL_ASSIGNMENT = - new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - - static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true); - + private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class); CSQueue parent; final String queueName; volatile int numContainers; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java index ceb6f7e..928437f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java @@ -24,12 +24,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.Resources; @Private @Unstable public class CSAssignment { + public static final CSAssignment NULL_ASSIGNMENT = + new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - final private Resource resource; + public static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true); + + private Resource resource; private NodeType type; private RMContainer excessReservation; private FiCaSchedulerApp application; @@ -67,6 +72,10 @@ public class CSAssignment { public Resource getResource() { return resource; } + + public void setResource(Resource resource) { + this.resource = resource; + } public NodeType getType() { return type; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index acfbad0..a71cc68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -777,7 +777,7 @@ public class LeafQueue extends AbstractCSQueue { // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY && !accessibleToPartition(node.getPartition())) { - return NULL_ASSIGNMENT; + return CSAssignment.NULL_ASSIGNMENT; } // Check if this queue need more resource, simply skip allocation if this @@ -789,7 +789,7 @@ public class LeafQueue extends AbstractCSQueue { + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-partition=" + node.getPartition()); } - return NULL_ASSIGNMENT; + return CSAssignment.NULL_ASSIGNMENT; } for (Iterator assignmentIterator = @@ -800,7 +800,7 @@ public class LeafQueue extends AbstractCSQueue { if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), currentResourceLimits, application.getCurrentReservation(), schedulingMode)) { - return NULL_ASSIGNMENT; + return CSAssignment.NULL_ASSIGNMENT; } Resource userLimit = @@ -846,11 +846,11 @@ public class LeafQueue extends AbstractCSQueue { } else if (!assignment.getSkipped()) { // If we don't allocate anything, and it is not skipped by application, // we will return to respect FIFO of applications - return NULL_ASSIGNMENT; + return CSAssignment.NULL_ASSIGNMENT; } } - return NULL_ASSIGNMENT; + return CSAssignment.NULL_ASSIGNMENT; } protected Resource getHeadroom(User user, Resource queueCurrentLimit, http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index e54b9e2..725aea1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -384,7 +384,7 @@ public class ParentQueue extends AbstractCSQueue { // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY && !accessibleToPartition(node.getPartition())) { - return NULL_ASSIGNMENT; + return CSAssignment.NULL_ASSIGNMENT; } // Check if this queue need more resource, simply skip allocation if this @@ -396,7 +396,7 @@ public class ParentQueue extends AbstractCSQueue { + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-partition=" + node.getPartition()); } - return NULL_ASSIGNMENT; + return CSAssignment.NULL_ASSIGNMENT; } CSAssignment assignment = http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java new file mode 100644 index 0000000..d1580bd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; + +public enum AllocationState { + APP_SKIPPED, + PRIORITY_SKIPPED, + LOCALITY_SKIPPED, + QUEUE_SKIPPED, + ALLOCATED, + RESERVED +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java new file mode 100644 index 0000000..00c1bb9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.util.resource.Resources; + +public class ContainerAllocation { + public static final ContainerAllocation PRIORITY_SKIPPED = + new ContainerAllocation(null, null, AllocationState.PRIORITY_SKIPPED); + + public static final ContainerAllocation APP_SKIPPED = + new ContainerAllocation(null, null, AllocationState.APP_SKIPPED); + + public static final ContainerAllocation QUEUE_SKIPPED = + new ContainerAllocation(null, null, AllocationState.QUEUE_SKIPPED); + + public static final ContainerAllocation LOCALITY_SKIPPED = + new ContainerAllocation(null, null, AllocationState.LOCALITY_SKIPPED); + + RMContainer containerToBeUnreserved; + private Resource resourceToBeAllocated = Resources.none(); + AllocationState state; + NodeType containerNodeType = NodeType.NODE_LOCAL; + NodeType requestNodeType = NodeType.NODE_LOCAL; + Container updatedContainer; + + public ContainerAllocation(RMContainer containerToBeUnreserved, + Resource resourceToBeAllocated, AllocationState state) { + this.containerToBeUnreserved = containerToBeUnreserved; + this.resourceToBeAllocated = resourceToBeAllocated; + this.state = state; + } + + public RMContainer getContainerToBeUnreserved() { + return containerToBeUnreserved; + } + + public Resource getResourceToBeAllocated() { + if (resourceToBeAllocated == null) { + return Resources.none(); + } + return resourceToBeAllocated; + } + + public AllocationState getAllocationState() { + return state; + } + + public NodeType getContainerNodeType() { + return containerNodeType; + } + + public Container getUpdatedContainer() { + return updatedContainer; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java new file mode 100644 index 0000000..b4168dd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * For an application, resource limits and resource requests, decide how to + * allocate container. This is to make application resource allocation logic + * extensible. + */ +public abstract class ContainerAllocator { + FiCaSchedulerApp application; + final ResourceCalculator rc; + final RMContext rmContext; + + public ContainerAllocator(FiCaSchedulerApp application, + ResourceCalculator rc, RMContext rmContext) { + this.application = application; + this.rc = rc; + this.rmContext = rmContext; + } + + /** + * preAllocation is to perform checks, etc. to see if we can/cannot allocate + * container. It will put necessary information to returned + * {@link ContainerAllocation}. + */ + abstract ContainerAllocation preAllocation( + Resource clusterResource, FiCaSchedulerNode node, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, + Priority priority, RMContainer reservedContainer); + + /** + * doAllocation is to update application metrics, create containers, etc. + * According to allocating conclusion decided by preAllocation. + */ + abstract ContainerAllocation doAllocation( + ContainerAllocation allocationResult, Resource clusterResource, + FiCaSchedulerNode node, SchedulingMode schedulingMode, Priority priority, + RMContainer reservedContainer); + + boolean checkHeadroom(Resource clusterResource, + ResourceLimits currentResourceLimits, Resource required, + FiCaSchedulerNode node) { + // If headroom + currentReservation < required, we cannot allocate this + // require + Resource resourceCouldBeUnReserved = application.getCurrentReservation(); + if (!application.getCSLeafQueue().getReservationContinueLooking() + || !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) { + // If we don't allow reservation continuous looking, OR we're looking at + // non-default node partition, we won't allow to unreserve before + // allocation. + resourceCouldBeUnReserved = Resources.none(); + } + return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add( + currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved), + required); + } + + /** + * allocate needs to handle following stuffs: + * + *
    + *
  • Select request: Select a request to allocate. E.g. select a resource + * request based on requirement/priority/locality.
  • + *
  • Check if a given resource can be allocated based on resource + * availability
  • + *
  • Do allocation: this will decide/create allocated/reserved + * container, this will also update metrics
  • + *
+ */ + public ContainerAllocation allocate(Resource clusterResource, + FiCaSchedulerNode node, SchedulingMode schedulingMode, + ResourceLimits resourceLimits, Priority priority, + RMContainer reservedContainer) { + ContainerAllocation result = + preAllocation(clusterResource, node, schedulingMode, + resourceLimits, priority, reservedContainer); + + if (AllocationState.ALLOCATED == result.state + || AllocationState.RESERVED == result.state) { + result = doAllocation(result, clusterResource, node, + schedulingMode, priority, reservedContainer); + } + + return result; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java new file mode 100644 index 0000000..6effcd3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -0,0 +1,629 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * Allocate normal (new) containers, considers locality/label, etc. Using + * delayed scheduling mechanism to get better locality allocation. + */ +public class RegularContainerAllocator extends ContainerAllocator { + private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class); + + private ResourceRequest lastResourceRequest = null; + + public RegularContainerAllocator(FiCaSchedulerApp application, + ResourceCalculator rc, RMContext rmContext) { + super(application, rc, rmContext); + } + + private ContainerAllocation preCheckForNewContainer(Resource clusterResource, + FiCaSchedulerNode node, SchedulingMode schedulingMode, + ResourceLimits resourceLimits, Priority priority) { + if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) { + return ContainerAllocation.APP_SKIPPED; + } + + ResourceRequest anyRequest = + application.getResourceRequest(priority, ResourceRequest.ANY); + if (null == anyRequest) { + return ContainerAllocation.PRIORITY_SKIPPED; + } + + // Required resource + Resource required = anyRequest.getCapability(); + + // Do we need containers at this 'priority'? + if (application.getTotalRequiredResources(priority) <= 0) { + return ContainerAllocation.PRIORITY_SKIPPED; + } + + // AM container allocation doesn't support non-exclusive allocation to + // avoid painful of preempt an AM container + if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { + RMAppAttempt rmAppAttempt = + rmContext.getRMApps().get(application.getApplicationId()) + .getCurrentAppAttempt(); + if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false + && null == rmAppAttempt.getMasterContainer()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip allocating AM container to app_attempt=" + + application.getApplicationAttemptId() + + ", don't allow to allocate AM container in non-exclusive mode"); + } + return ContainerAllocation.APP_SKIPPED; + } + } + + // Is the node-label-expression of this offswitch resource request + // matches the node's label? + // If not match, jump to next priority. + if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(anyRequest, + node.getPartition(), schedulingMode)) { + return ContainerAllocation.PRIORITY_SKIPPED; + } + + if (!application.getCSLeafQueue().getReservationContinueLooking()) { + if (!shouldAllocOrReserveNewContainer(priority, required)) { + if (LOG.isDebugEnabled()) { + LOG.debug("doesn't need containers based on reservation algo!"); + } + return ContainerAllocation.PRIORITY_SKIPPED; + } + } + + if (!checkHeadroom(clusterResource, resourceLimits, required, node)) { + if (LOG.isDebugEnabled()) { + LOG.debug("cannot allocate required resource=" + required + + " because of headroom"); + } + return ContainerAllocation.QUEUE_SKIPPED; + } + + // Inform the application it is about to get a scheduling opportunity + application.addSchedulingOpportunity(priority); + + // Increase missed-non-partitioned-resource-request-opportunity. + // This is to make sure non-partitioned-resource-request will prefer + // to be allocated to non-partitioned nodes + int missedNonPartitionedRequestSchedulingOpportunity = 0; + if (anyRequest.getNodeLabelExpression() + .equals(RMNodeLabelsManager.NO_LABEL)) { + missedNonPartitionedRequestSchedulingOpportunity = + application + .addMissedNonPartitionedRequestSchedulingOpportunity(priority); + } + + if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { + // Before doing allocation, we need to check scheduling opportunity to + // make sure : non-partitioned resource request should be scheduled to + // non-partitioned partition first. + if (missedNonPartitionedRequestSchedulingOpportunity < rmContext + .getScheduler().getNumClusterNodes()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId() + + " priority=" + priority + + " because missed-non-partitioned-resource-request" + + " opportunity under requred:" + " Now=" + + missedNonPartitionedRequestSchedulingOpportunity + " required=" + + rmContext.getScheduler().getNumClusterNodes()); + } + + return ContainerAllocation.APP_SKIPPED; + } + } + + return null; + } + + @Override + ContainerAllocation preAllocation(Resource clusterResource, + FiCaSchedulerNode node, SchedulingMode schedulingMode, + ResourceLimits resourceLimits, Priority priority, + RMContainer reservedContainer) { + ContainerAllocation result; + if (null == reservedContainer) { + // pre-check when allocating new container + result = + preCheckForNewContainer(clusterResource, node, schedulingMode, + resourceLimits, priority); + if (null != result) { + return result; + } + } else { + // pre-check when allocating reserved container + if (application.getTotalRequiredResources(priority) == 0) { + // Release + return new ContainerAllocation(reservedContainer, null, + AllocationState.QUEUE_SKIPPED); + } + } + + // Try to allocate containers on node + result = + assignContainersOnNode(clusterResource, node, priority, + reservedContainer, schedulingMode, resourceLimits); + + if (null == reservedContainer) { + if (result.state == AllocationState.PRIORITY_SKIPPED) { + // Don't count 'skipped nodes' as a scheduling opportunity! + application.subtractSchedulingOpportunity(priority); + } + } + + return result; + } + + public synchronized float getLocalityWaitFactor( + Priority priority, int clusterNodes) { + // Estimate: Required unique resources (i.e. hosts + racks) + int requiredResources = + Math.max(application.getResourceRequests(priority).size() - 1, 0); + + // waitFactor can't be more than '1' + // i.e. no point skipping more than clustersize opportunities + return Math.min(((float)requiredResources / clusterNodes), 1.0f); + } + + private int getActualNodeLocalityDelay() { + return Math.min(rmContext.getScheduler().getNumClusterNodes(), application + .getCSLeafQueue().getNodeLocalityDelay()); + } + + private boolean canAssign(Priority priority, FiCaSchedulerNode node, + NodeType type, RMContainer reservedContainer) { + + // Clearly we need containers for this application... + if (type == NodeType.OFF_SWITCH) { + if (reservedContainer != null) { + return true; + } + + // 'Delay' off-switch + ResourceRequest offSwitchRequest = + application.getResourceRequest(priority, ResourceRequest.ANY); + long missedOpportunities = application.getSchedulingOpportunities(priority); + long requiredContainers = offSwitchRequest.getNumContainers(); + + float localityWaitFactor = + getLocalityWaitFactor(priority, rmContext.getScheduler() + .getNumClusterNodes()); + + return ((requiredContainers * localityWaitFactor) < missedOpportunities); + } + + // Check if we need containers on this rack + ResourceRequest rackLocalRequest = + application.getResourceRequest(priority, node.getRackName()); + if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) { + return false; + } + + // If we are here, we do need containers on this rack for RACK_LOCAL req + if (type == NodeType.RACK_LOCAL) { + // 'Delay' rack-local just a little bit... + long missedOpportunities = application.getSchedulingOpportunities(priority); + return getActualNodeLocalityDelay() < missedOpportunities; + } + + // Check if we need containers on this host + if (type == NodeType.NODE_LOCAL) { + // Now check if we need containers on this host... + ResourceRequest nodeLocalRequest = + application.getResourceRequest(priority, node.getNodeName()); + if (nodeLocalRequest != null) { + return nodeLocalRequest.getNumContainers() > 0; + } + } + + return false; + } + + private ContainerAllocation assignNodeLocalContainers( + Resource clusterResource, ResourceRequest nodeLocalResourceRequest, + FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, + SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + if (canAssign(priority, node, NodeType.NODE_LOCAL, reservedContainer)) { + return assignContainer(clusterResource, node, priority, + nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, + schedulingMode, currentResoureLimits); + } + + // Skip node-local request, go to rack-local request + return ContainerAllocation.LOCALITY_SKIPPED; + } + + private ContainerAllocation assignRackLocalContainers( + Resource clusterResource, ResourceRequest rackLocalResourceRequest, + FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, + SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + if (canAssign(priority, node, NodeType.RACK_LOCAL, reservedContainer)) { + return assignContainer(clusterResource, node, priority, + rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, + schedulingMode, currentResoureLimits); + } + + // Skip rack-local request, go to off-switch request + return ContainerAllocation.LOCALITY_SKIPPED; + } + + private ContainerAllocation assignOffSwitchContainers( + Resource clusterResource, ResourceRequest offSwitchResourceRequest, + FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, + SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + if (canAssign(priority, node, NodeType.OFF_SWITCH, reservedContainer)) { + return assignContainer(clusterResource, node, priority, + offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, + schedulingMode, currentResoureLimits); + } + + return ContainerAllocation.QUEUE_SKIPPED; + } + + private ContainerAllocation assignContainersOnNode(Resource clusterResource, + FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, + SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + + ContainerAllocation assigned; + + NodeType requestType = null; + // Data-local + ResourceRequest nodeLocalResourceRequest = + application.getResourceRequest(priority, node.getNodeName()); + if (nodeLocalResourceRequest != null) { + requestType = NodeType.NODE_LOCAL; + assigned = + assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, + node, priority, reservedContainer, schedulingMode, + currentResoureLimits); + if (Resources.greaterThan(rc, clusterResource, + assigned.getResourceToBeAllocated(), Resources.none())) { + assigned.requestNodeType = requestType; + return assigned; + } + } + + // Rack-local + ResourceRequest rackLocalResourceRequest = + application.getResourceRequest(priority, node.getRackName()); + if (rackLocalResourceRequest != null) { + if (!rackLocalResourceRequest.getRelaxLocality()) { + return ContainerAllocation.PRIORITY_SKIPPED; + } + + if (requestType != NodeType.NODE_LOCAL) { + requestType = NodeType.RACK_LOCAL; + } + + assigned = + assignRackLocalContainers(clusterResource, rackLocalResourceRequest, + node, priority, reservedContainer, schedulingMode, + currentResoureLimits); + if (Resources.greaterThan(rc, clusterResource, + assigned.getResourceToBeAllocated(), Resources.none())) { + assigned.requestNodeType = requestType; + return assigned; + } + } + + // Off-switch + ResourceRequest offSwitchResourceRequest = + application.getResourceRequest(priority, ResourceRequest.ANY); + if (offSwitchResourceRequest != null) { + if (!offSwitchResourceRequest.getRelaxLocality()) { + return ContainerAllocation.PRIORITY_SKIPPED; + } + if (requestType != NodeType.NODE_LOCAL + && requestType != NodeType.RACK_LOCAL) { + requestType = NodeType.OFF_SWITCH; + } + + assigned = + assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, + node, priority, reservedContainer, schedulingMode, + currentResoureLimits); + assigned.requestNodeType = requestType; + + return assigned; + } + + return ContainerAllocation.PRIORITY_SKIPPED; + } + + private ContainerAllocation assignContainer(Resource clusterResource, + FiCaSchedulerNode node, Priority priority, ResourceRequest request, + NodeType type, RMContainer rmContainer, SchedulingMode schedulingMode, + ResourceLimits currentResoureLimits) { + lastResourceRequest = request; + + if (LOG.isDebugEnabled()) { + LOG.debug("assignContainers: node=" + node.getNodeName() + + " application=" + application.getApplicationId() + + " priority=" + priority.getPriority() + + " request=" + request + " type=" + type); + } + + // check if the resource request can access the label + if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request, + node.getPartition(), schedulingMode)) { + // this is a reserved container, but we cannot allocate it now according + // to label not match. This can be caused by node label changed + // We should un-reserve this container. + return new ContainerAllocation(rmContainer, null, + AllocationState.QUEUE_SKIPPED); + } + + Resource capability = request.getCapability(); + Resource available = node.getAvailableResource(); + Resource totalResource = node.getTotalResource(); + + if (!Resources.lessThanOrEqual(rc, clusterResource, + capability, totalResource)) { + LOG.warn("Node : " + node.getNodeID() + + " does not have sufficient resource for request : " + request + + " node total capability : " + node.getTotalResource()); + return ContainerAllocation.QUEUE_SKIPPED; + } + + assert Resources.greaterThan( + rc, clusterResource, available, Resources.none()); + + boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer( + priority, capability); + + // Can we allocate a container on this node? + int availableContainers = + rc.computeAvailableContainers(available, capability); + + // How much need to unreserve equals to: + // max(required - headroom, amountNeedUnreserve) + Resource resourceNeedToUnReserve = + Resources.max(rc, clusterResource, + Resources.subtract(capability, currentResoureLimits.getHeadroom()), + currentResoureLimits.getAmountNeededUnreserve()); + + boolean needToUnreserve = + Resources.greaterThan(rc, clusterResource, + resourceNeedToUnReserve, Resources.none()); + + RMContainer unreservedContainer = null; + boolean reservationsContinueLooking = + application.getCSLeafQueue().getReservationContinueLooking(); + + if (availableContainers > 0) { + // Allocate... + // We will only do continuous reservation when this is not allocated from + // reserved container + if (rmContainer == null && reservationsContinueLooking + && node.getLabels().isEmpty()) { + // when reservationsContinueLooking is set, we may need to unreserve + // some containers to meet this queue, its parents', or the users' + // resource limits. + // TODO, need change here when we want to support continuous reservation + // looking for labeled partitions. + if (!shouldAllocOrReserveNewContainer || needToUnreserve) { + if (!needToUnreserve) { + // If we shouldn't allocate/reserve new container then we should + // unreserve one the same size we are asking for since the + // currentResoureLimits.getAmountNeededUnreserve could be zero. If + // the limit was hit then use the amount we need to unreserve to be + // under the limit. + resourceNeedToUnReserve = capability; + } + unreservedContainer = + application.findNodeToUnreserve(clusterResource, node, priority, + resourceNeedToUnReserve); + // When (minimum-unreserved-resource > 0 OR we cannot allocate + // new/reserved + // container (That means we *have to* unreserve some resource to + // continue)). If we failed to unreserve some resource, we can't + // continue. + if (null == unreservedContainer) { + return ContainerAllocation.QUEUE_SKIPPED; + } + } + } + + ContainerAllocation result = + new ContainerAllocation(unreservedContainer, request.getCapability(), + AllocationState.ALLOCATED); + result.containerNodeType = type; + return result; + } else { + // if we are allowed to allocate but this node doesn't have space, reserve it or + // if this was an already a reserved container, reserve it again + if (shouldAllocOrReserveNewContainer || rmContainer != null) { + + if (reservationsContinueLooking && rmContainer == null) { + // we could possibly ignoring queue capacity or user limits when + // reservationsContinueLooking is set. Make sure we didn't need to unreserve + // one. + if (needToUnreserve) { + if (LOG.isDebugEnabled()) { + LOG.debug("we needed to unreserve to be able to allocate"); + } + return ContainerAllocation.QUEUE_SKIPPED; + } + } + + ContainerAllocation result = + new ContainerAllocation(null, request.getCapability(), + AllocationState.RESERVED); + result.containerNodeType = type; + return result; + } + return ContainerAllocation.QUEUE_SKIPPED; + } + } + + boolean + shouldAllocOrReserveNewContainer(Priority priority, Resource required) { + int requiredContainers = application.getTotalRequiredResources(priority); + int reservedContainers = application.getNumReservedContainers(priority); + int starvation = 0; + if (reservedContainers > 0) { + float nodeFactor = + Resources + .ratio(rc, required, application.getCSLeafQueue().getMaximumAllocation()); + + // Use percentage of node required to bias against large containers... + // Protect against corner case where you need the whole node with + // Math.min(nodeFactor, minimumAllocationFactor) + starvation = + (int) ((application.getReReservations(priority) / + (float) reservedContainers) * (1.0f - (Math.min( + nodeFactor, application.getCSLeafQueue() + .getMinimumAllocationFactor())))); + + if (LOG.isDebugEnabled()) { + LOG.debug("needsContainers:" + " app.#re-reserve=" + + application.getReReservations(priority) + " reserved=" + + reservedContainers + " nodeFactor=" + nodeFactor + + " minAllocFactor=" + + application.getCSLeafQueue().getMinimumAllocationFactor() + + " starvation=" + starvation); + } + } + return (((starvation + requiredContainers) - reservedContainers) > 0); + } + + private Container getContainer(RMContainer rmContainer, + FiCaSchedulerNode node, Resource capability, Priority priority) { + return (rmContainer != null) ? rmContainer.getContainer() + : createContainer(node, capability, priority); + } + + private Container createContainer(FiCaSchedulerNode node, Resource capability, + Priority priority) { + + NodeId nodeId = node.getRMNode().getNodeID(); + ContainerId containerId = + BuilderUtils.newContainerId(application.getApplicationAttemptId(), + application.getNewContainerId()); + + // Create the container + return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() + .getHttpAddress(), capability, priority, null); + } + + private ContainerAllocation handleNewContainerAllocation( + ContainerAllocation allocationResult, FiCaSchedulerNode node, + Priority priority, RMContainer reservedContainer, Container container) { + // Handling container allocation + // Did we previously reserve containers at this 'priority'? + if (reservedContainer != null) { + application.unreserve(priority, node, reservedContainer); + } + + // Inform the application + RMContainer allocatedContainer = + application.allocate(allocationResult.containerNodeType, node, + priority, lastResourceRequest, container); + + // Does the application need this resource? + if (allocatedContainer == null) { + // Skip this app if we failed to allocate. + ContainerAllocation ret = + new ContainerAllocation(allocationResult.containerToBeUnreserved, + null, AllocationState.QUEUE_SKIPPED); + ret.state = AllocationState.APP_SKIPPED; + return ret; + } + + // Inform the node + node.allocateContainer(allocatedContainer); + + // update locality statistics + application.incNumAllocatedContainers(allocationResult.containerNodeType, + allocationResult.requestNodeType); + + return allocationResult; + } + + @Override + ContainerAllocation doAllocation(ContainerAllocation allocationResult, + Resource clusterResource, FiCaSchedulerNode node, + SchedulingMode schedulingMode, Priority priority, + RMContainer reservedContainer) { + // Create the container if necessary + Container container = + getContainer(reservedContainer, node, + allocationResult.getResourceToBeAllocated(), priority); + + // something went wrong getting/creating the container + if (container == null) { + LOG.warn("Couldn't get container for allocation!"); + return ContainerAllocation.QUEUE_SKIPPED; + } + + if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) { + // When allocating container + allocationResult = + handleNewContainerAllocation(allocationResult, node, priority, + reservedContainer, container); + } else { + // When reserving container + application.reserve(priority, node, reservedContainer, container); + } + allocationResult.updatedContainer = container; + + // Only reset opportunities when we FIRST allocate the container. (IAW, When + // reservedContainer != null, it's not the first time) + if (reservedContainer == null) { + // Don't reset scheduling opportunities for off-switch assignments + // otherwise the app will be delayed for each non-local assignment. + // This helps apps with many off-cluster requests schedule faster. + if (allocationResult.containerNodeType != NodeType.OFF_SWITCH) { + if (LOG.isDebugEnabled()) { + LOG.debug("Resetting scheduling opportunities"); + } + application.resetSchedulingOpportunities(priority); + } + + // Non-exclusive scheduling opportunity is different: we need reset + // it every time to make sure non-labeled resource request will be + // most likely allocated on non-labeled nodes first. + application.resetMissedNonPartitionedRequestSchedulingOpportunity(priority); + } + + return allocationResult; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index c660fcb..d75b2c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.commons.lang.mutable.MutableObject; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -40,9 +39,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -54,15 +51,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.RegularContainerAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocation; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -78,11 +76,6 @@ import com.google.common.annotations.VisibleForTesting; public class FiCaSchedulerApp extends SchedulerApplicationAttempt { private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class); - static final CSAssignment NULL_ASSIGNMENT = - new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - - static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true); - private final Set containersToPreempt = new HashSet(); @@ -91,6 +84,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { private ResourceCalculator rc = new DefaultResourceCalculator(); private ResourceScheduler scheduler; + + private ContainerAllocator containerAllocator; public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, @@ -124,6 +119,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { if (scheduler.getResourceCalculator() != null) { rc = scheduler.getResourceCalculator(); } + + containerAllocator = new RegularContainerAllocator(this, rc, rmContext); } synchronized public boolean containerCompleted(RMContainer rmContainer, @@ -386,223 +383,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { ((FiCaSchedulerApp) appAttempt).getHeadroomProvider(); } - private int getActualNodeLocalityDelay() { - return Math.min(scheduler.getNumClusterNodes(), getCSLeafQueue() - .getNodeLocalityDelay()); - } - - private boolean canAssign(Priority priority, FiCaSchedulerNode node, - NodeType type, RMContainer reservedContainer) { - - // Clearly we need containers for this application... - if (type == NodeType.OFF_SWITCH) { - if (reservedContainer != null) { - return true; - } - - // 'Delay' off-switch - ResourceRequest offSwitchRequest = - getResourceRequest(priority, ResourceRequest.ANY); - long missedOpportunities = getSchedulingOpportunities(priority); - long requiredContainers = offSwitchRequest.getNumContainers(); - - float localityWaitFactor = - getLocalityWaitFactor(priority, scheduler.getNumClusterNodes()); - - return ((requiredContainers * localityWaitFactor) < missedOpportunities); - } - - // Check if we need containers on this rack - ResourceRequest rackLocalRequest = - getResourceRequest(priority, node.getRackName()); - if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) { - return false; - } - - // If we are here, we do need containers on this rack for RACK_LOCAL req - if (type == NodeType.RACK_LOCAL) { - // 'Delay' rack-local just a little bit... - long missedOpportunities = getSchedulingOpportunities(priority); - return getActualNodeLocalityDelay() < missedOpportunities; - } - - // Check if we need containers on this host - if (type == NodeType.NODE_LOCAL) { - // Now check if we need containers on this host... - ResourceRequest nodeLocalRequest = - getResourceRequest(priority, node.getNodeName()); - if (nodeLocalRequest != null) { - return nodeLocalRequest.getNumContainers() > 0; - } - } - - return false; - } - - boolean - shouldAllocOrReserveNewContainer(Priority priority, Resource required) { - int requiredContainers = getTotalRequiredResources(priority); - int reservedContainers = getNumReservedContainers(priority); - int starvation = 0; - if (reservedContainers > 0) { - float nodeFactor = - Resources.ratio( - rc, required, getCSLeafQueue().getMaximumAllocation() - ); - - // Use percentage of node required to bias against large containers... - // Protect against corner case where you need the whole node with - // Math.min(nodeFactor, minimumAllocationFactor) - starvation = - (int)((getReReservations(priority) / (float)reservedContainers) * - (1.0f - (Math.min(nodeFactor, getCSLeafQueue().getMinimumAllocationFactor()))) - ); - - if (LOG.isDebugEnabled()) { - LOG.debug("needsContainers:" + - " app.#re-reserve=" + getReReservations(priority) + - " reserved=" + reservedContainers + - " nodeFactor=" + nodeFactor + - " minAllocFactor=" + getCSLeafQueue().getMinimumAllocationFactor() + - " starvation=" + starvation); - } - } - return (((starvation + requiredContainers) - reservedContainers) > 0); - } - - private CSAssignment assignNodeLocalContainers(Resource clusterResource, - ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, - Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer, - SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { - if (canAssign(priority, node, NodeType.NODE_LOCAL, - reservedContainer)) { - return assignContainer(clusterResource, node, priority, - nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, - allocatedContainer, schedulingMode, currentResoureLimits); - } - - return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); - } - - private CSAssignment assignRackLocalContainers(Resource clusterResource, - ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node, - Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer, - SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { - if (canAssign(priority, node, NodeType.RACK_LOCAL, - reservedContainer)) { - return assignContainer(clusterResource, node, priority, - rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, - allocatedContainer, schedulingMode, currentResoureLimits); - } - - return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL); - } - - private CSAssignment assignOffSwitchContainers(Resource clusterResource, - ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node, - Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer, - SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { - if (canAssign(priority, node, NodeType.OFF_SWITCH, - reservedContainer)) { - return assignContainer(clusterResource, node, priority, - offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, - allocatedContainer, schedulingMode, currentResoureLimits); - } - - return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH); - } - - private CSAssignment assignContainersOnNode(Resource clusterResource, - FiCaSchedulerNode node, Priority priority, - RMContainer reservedContainer, SchedulingMode schedulingMode, - ResourceLimits currentResoureLimits) { - - CSAssignment assigned; - - NodeType requestType = null; - MutableObject allocatedContainer = new MutableObject(); - // Data-local - ResourceRequest nodeLocalResourceRequest = - getResourceRequest(priority, node.getNodeName()); - if (nodeLocalResourceRequest != null) { - requestType = NodeType.NODE_LOCAL; - assigned = - assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, - node, priority, reservedContainer, - allocatedContainer, schedulingMode, currentResoureLimits); - if (Resources.greaterThan(rc, clusterResource, - assigned.getResource(), Resources.none())) { - - //update locality statistics - if (allocatedContainer.getValue() != null) { - incNumAllocatedContainers(NodeType.NODE_LOCAL, - requestType); - } - assigned.setType(NodeType.NODE_LOCAL); - return assigned; - } - } - - // Rack-local - ResourceRequest rackLocalResourceRequest = - getResourceRequest(priority, node.getRackName()); - if (rackLocalResourceRequest != null) { - if (!rackLocalResourceRequest.getRelaxLocality()) { - return SKIP_ASSIGNMENT; - } - - if (requestType != NodeType.NODE_LOCAL) { - requestType = NodeType.RACK_LOCAL; - } - - assigned = - assignRackLocalContainers(clusterResource, rackLocalResourceRequest, - node, priority, reservedContainer, - allocatedContainer, schedulingMode, currentResoureLimits); - if (Resources.greaterThan(rc, clusterResource, - assigned.getResource(), Resources.none())) { - - //update locality statistics - if (allocatedContainer.getValue() != null) { - incNumAllocatedContainers(NodeType.RACK_LOCAL, - requestType); - } - assigned.setType(NodeType.RACK_LOCAL); - return assigned; - } - } - - // Off-switch - ResourceRequest offSwitchResourceRequest = - getResourceRequest(priority, ResourceRequest.ANY); - if (offSwitchResourceRequest != null) { - if (!offSwitchResourceRequest.getRelaxLocality()) { - return SKIP_ASSIGNMENT; - } - if (requestType != NodeType.NODE_LOCAL - && requestType != NodeType.RACK_LOCAL) { - requestType = NodeType.OFF_SWITCH; - } - - assigned = - assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, - node, priority, reservedContainer, - allocatedContainer, schedulingMode, currentResoureLimits); - - // update locality statistics - if (allocatedContainer.getValue() != null) { - incNumAllocatedContainers(NodeType.OFF_SWITCH, requestType); - } - assigned.setType(NodeType.OFF_SWITCH); - return assigned; - } - - return SKIP_ASSIGNMENT; - } - public void reserve(Priority priority, FiCaSchedulerNode node, RMContainer rmContainer, Container container) { // Update reserved metrics if this is the first reservation @@ -618,25 +398,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { node.reserveResource(this, priority, rmContainer); } - private Container getContainer(RMContainer rmContainer, - FiCaSchedulerNode node, Resource capability, Priority priority) { - return (rmContainer != null) ? rmContainer.getContainer() - : createContainer(node, capability, priority); - } - - Container createContainer(FiCaSchedulerNode node, Resource capability, - Priority priority) { - - NodeId nodeId = node.getRMNode().getNodeID(); - ContainerId containerId = - BuilderUtils.newContainerId(getApplicationAttemptId(), - getNewContainerId()); - - // Create the container - return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() - .getHttpAddress(), capability, priority, null); - } - @VisibleForTesting public RMContainer findNodeToUnreserve(Resource clusterResource, FiCaSchedulerNode node, Priority priority, @@ -672,203 +433,63 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return nodeToUnreserve.getReservedContainer(); } - private LeafQueue getCSLeafQueue() { + public LeafQueue getCSLeafQueue() { return (LeafQueue)queue; } - private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node, - Priority priority, - ResourceRequest request, NodeType type, RMContainer rmContainer, - MutableObject createdContainer, SchedulingMode schedulingMode, - ResourceLimits currentResoureLimits) { - if (LOG.isDebugEnabled()) { - LOG.debug("assignContainers: node=" + node.getNodeName() - + " application=" + getApplicationId() - + " priority=" + priority.getPriority() - + " request=" + request + " type=" + type); - } - - // check if the resource request can access the label - if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request, - node.getPartition(), schedulingMode)) { - // this is a reserved container, but we cannot allocate it now according - // to label not match. This can be caused by node label changed - // We should un-reserve this container. - if (rmContainer != null) { - unreserve(priority, node, rmContainer); - } - return new CSAssignment(Resources.none(), type); - } - - Resource capability = request.getCapability(); - Resource available = node.getAvailableResource(); - Resource totalResource = node.getTotalResource(); - - if (!Resources.lessThanOrEqual(rc, clusterResource, - capability, totalResource)) { - LOG.warn("Node : " + node.getNodeID() - + " does not have sufficient resource for request : " + request - + " node total capability : " + node.getTotalResource()); - return new CSAssignment(Resources.none(), type); - } - - assert Resources.greaterThan( - rc, clusterResource, available, Resources.none()); - - // Create the container if necessary - Container container = - getContainer(rmContainer, node, capability, priority); - - // something went wrong getting/creating the container - if (container == null) { - LOG.warn("Couldn't get container for allocation!"); - return new CSAssignment(Resources.none(), type); - } - - boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer( - priority, capability); - - // Can we allocate a container on this node? - int availableContainers = - rc.computeAvailableContainers(available, capability); - - // How much need to unreserve equals to: - // max(required - headroom, amountNeedUnreserve) - Resource resourceNeedToUnReserve = - Resources.max(rc, clusterResource, - Resources.subtract(capability, currentResoureLimits.getHeadroom()), - currentResoureLimits.getAmountNeededUnreserve()); - - boolean needToUnreserve = - Resources.greaterThan(rc, clusterResource, - resourceNeedToUnReserve, Resources.none()); - - RMContainer unreservedContainer = null; - boolean reservationsContinueLooking = - getCSLeafQueue().getReservationContinueLooking(); - - if (availableContainers > 0) { - // Allocate... - - // Did we previously reserve containers at this 'priority'? - if (rmContainer != null) { - unreserve(priority, node, rmContainer); - } else if (reservationsContinueLooking && node.getLabels().isEmpty()) { - // when reservationsContinueLooking is set, we may need to unreserve - // some containers to meet this queue, its parents', or the users' resource limits. - // TODO, need change here when we want to support continuous reservation - // looking for labeled partitions. - if (!shouldAllocOrReserveNewContainer || needToUnreserve) { - if (!needToUnreserve) { - // If we shouldn't allocate/reserve new container then we should - // unreserve one the same size we are asking for since the - // currentResoureLimits.getAmountNeededUnreserve could be zero. If - // the limit was hit then use the amount we need to unreserve to be - // under the limit. - resourceNeedToUnReserve = capability; - } - unreservedContainer = - findNodeToUnreserve(clusterResource, node, priority, - resourceNeedToUnReserve); - // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved - // container (That means we *have to* unreserve some resource to - // continue)). If we failed to unreserve some resource, we can't continue. - if (null == unreservedContainer) { - return new CSAssignment(Resources.none(), type); - } - } - } - - // Inform the application - RMContainer allocatedContainer = - allocate(type, node, priority, request, container); - - // Does the application need this resource? - if (allocatedContainer == null) { - CSAssignment csAssignment = new CSAssignment(Resources.none(), type); - csAssignment.setApplication(this); - csAssignment.setExcessReservation(unreservedContainer); - return csAssignment; - } - - // Inform the node - node.allocateContainer(allocatedContainer); - - // Inform the ordering policy - getCSLeafQueue().getOrderingPolicy().containerAllocated(this, - allocatedContainer); - - LOG.info("assignedContainer" + - " application attempt=" + getApplicationAttemptId() + - " container=" + container + - " queue=" + this + - " clusterResource=" + clusterResource); - createdContainer.setValue(allocatedContainer); - CSAssignment assignment = new CSAssignment(container.getResource(), type); - assignment.getAssignmentInformation().addAllocationDetails( - container.getId(), getCSLeafQueue().getQueuePath()); - assignment.getAssignmentInformation().incrAllocations(); - assignment.setApplication(this); - Resources.addTo(assignment.getAssignmentInformation().getAllocated(), - container.getResource()); - - assignment.setExcessReservation(unreservedContainer); - return assignment; - } else { - // if we are allowed to allocate but this node doesn't have space, reserve it or - // if this was an already a reserved container, reserve it again - if (shouldAllocOrReserveNewContainer || rmContainer != null) { - - if (reservationsContinueLooking && rmContainer == null) { - // we could possibly ignoring queue capacity or user limits when - // reservationsContinueLooking is set. Make sure we didn't need to unreserve - // one. - if (needToUnreserve) { - if (LOG.isDebugEnabled()) { - LOG.debug("we needed to unreserve to be able to allocate"); - } - return new CSAssignment(Resources.none(), type); - } - } - - // Reserve by 'charging' in advance... - reserve(priority, node, rmContainer, container); - - LOG.info("Reserved container " + - " application=" + getApplicationId() + - " resource=" + request.getCapability() + - " queue=" + this.toString() + - " cluster=" + clusterResource); - CSAssignment assignment = - new CSAssignment(request.getCapability(), type); + private CSAssignment getCSAssignmentFromAllocateResult( + Resource clusterResource, ContainerAllocation result) { + // Handle skipped + boolean skipped = + (result.getAllocationState() == AllocationState.APP_SKIPPED); + CSAssignment assignment = new CSAssignment(skipped); + assignment.setApplication(this); + + // Handle excess reservation + assignment.setExcessReservation(result.getContainerToBeUnreserved()); + + // If we allocated something + if (Resources.greaterThan(rc, clusterResource, + result.getResourceToBeAllocated(), Resources.none())) { + Resource allocatedResource = result.getResourceToBeAllocated(); + Container updatedContainer = result.getUpdatedContainer(); + + assignment.setResource(allocatedResource); + assignment.setType(result.getContainerNodeType()); + + if (result.getAllocationState() == AllocationState.RESERVED) { + // This is a reserved container + LOG.info("Reserved container " + " application=" + getApplicationId() + + " resource=" + allocatedResource + " queue=" + + this.toString() + " cluster=" + clusterResource); assignment.getAssignmentInformation().addReservationDetails( - container.getId(), getCSLeafQueue().getQueuePath()); + updatedContainer.getId(), getCSLeafQueue().getQueuePath()); assignment.getAssignmentInformation().incrReservations(); Resources.addTo(assignment.getAssignmentInformation().getReserved(), - request.getCapability()); - return assignment; + allocatedResource); + assignment.setFulfilledReservation(true); + } else { + // This is a new container + // Inform the ordering policy + LOG.info("assignedContainer" + " application attempt=" + + getApplicationAttemptId() + " container=" + + updatedContainer.getId() + " queue=" + this + " clusterResource=" + + clusterResource); + + getCSLeafQueue().getOrderingPolicy().containerAllocated(this, + getRMContainer(updatedContainer.getId())); + + assignment.getAssignmentInformation().addAllocationDetails( + updatedContainer.getId(), getCSLeafQueue().getQueuePath()); + assignment.getAssignmentInformation().incrAllocations(); + Resources.addTo(assignment.getAssignmentInformation().getAllocated(), + allocatedResource); } - return new CSAssignment(Resources.none(), type); } + + return assignment; } - - private boolean checkHeadroom(Resource clusterResource, - ResourceLimits currentResourceLimits, Resource required, FiCaSchedulerNode node) { - // If headroom + currentReservation < required, we cannot allocate this - // require - Resource resourceCouldBeUnReserved = getCurrentReservation(); - if (!getCSLeafQueue().getReservationContinueLooking() || !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) { - // If we don't allow reservation continuous looking, OR we're looking at - // non-default node partition, we won't allow to unreserve before - // allocation. - resourceCouldBeUnReserved = Resources.none(); - } - return Resources - .greaterThanOrEqual(rc, clusterResource, Resources.add( - currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved), - required); - } - + public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { @@ -886,174 +507,41 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-label=" + node.getPartition()); } - return SKIP_ASSIGNMENT; + return CSAssignment.SKIP_ASSIGNMENT; } synchronized (this) { - // Check if this resource is on the blacklist - if (SchedulerAppUtils.isBlacklisted(this, node, LOG)) { - return SKIP_ASSIGNMENT; - } - // Schedule in priority order for (Priority priority : getPriorities()) { - ResourceRequest anyRequest = - getResourceRequest(priority, ResourceRequest.ANY); - if (null == anyRequest) { - continue; - } - - // Required resource - Resource required = anyRequest.getCapability(); - - // Do we need containers at this 'priority'? - if (getTotalRequiredResources(priority) <= 0) { - continue; - } - - // AM container allocation doesn't support non-exclusive allocation to - // avoid painful of preempt an AM container - if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { + ContainerAllocation allocationResult = + containerAllocator.allocate(clusterResource, node, + schedulingMode, currentResourceLimits, priority, null); - RMAppAttempt rmAppAttempt = - rmContext.getRMApps() - .get(getApplicationId()).getCurrentAppAttempt(); - if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false - && null == rmAppAttempt.getMasterContainer()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip allocating AM container to app_attempt=" - + getApplicationAttemptId() - + ", don't allow to allocate AM container in non-exclusive mode"); - } - break; - } - } + // If it's a skipped allocation + AllocationState allocationState = allocationResult.getAllocationState(); - // Is the node-label-expression of this offswitch resource request - // matches the node's label? - // If not match, jump to next priority. - if (!SchedulerUtils.checkResourceRequestMatchingNodePartition( - anyRequest, node.getPartition(), schedulingMode)) { + if (allocationState == AllocationState.PRIORITY_SKIPPED) { continue; } - - if (!getCSLeafQueue().getReservationContinueLooking()) { - if (!shouldAllocOrReserveNewContainer(priority, required)) { - if (LOG.isDebugEnabled()) { - LOG.debug("doesn't need containers based on reservation algo!"); - } - continue; - } - } - - if (!checkHeadroom(clusterResource, currentResourceLimits, required, - node)) { - if (LOG.isDebugEnabled()) { - LOG.debug("cannot allocate required resource=" + required - + " because of headroom"); - } - return NULL_ASSIGNMENT; - } - - // Inform the application it is about to get a scheduling opportunity - addSchedulingOpportunity(priority); - - // Increase missed-non-partitioned-resource-request-opportunity. - // This is to make sure non-partitioned-resource-request will prefer - // to be allocated to non-partitioned nodes - int missedNonPartitionedRequestSchedulingOpportunity = 0; - if (anyRequest.getNodeLabelExpression().equals( - RMNodeLabelsManager.NO_LABEL)) { - missedNonPartitionedRequestSchedulingOpportunity = - addMissedNonPartitionedRequestSchedulingOpportunity(priority); - } - - if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { - // Before doing allocation, we need to check scheduling opportunity to - // make sure : non-partitioned resource request should be scheduled to - // non-partitioned partition first. - if (missedNonPartitionedRequestSchedulingOpportunity < rmContext - .getScheduler().getNumClusterNodes()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip app_attempt=" - + getApplicationAttemptId() + " priority=" - + priority - + " because missed-non-partitioned-resource-request" - + " opportunity under requred:" + " Now=" - + missedNonPartitionedRequestSchedulingOpportunity - + " required=" - + rmContext.getScheduler().getNumClusterNodes()); - } - - return SKIP_ASSIGNMENT; - } - } - - // Try to schedule - CSAssignment assignment = - assignContainersOnNode(clusterResource, node, - priority, null, schedulingMode, currentResourceLimits); - - // Did the application skip this node? - if (assignment.getSkipped()) { - // Don't count 'skipped nodes' as a scheduling opportunity! - subtractSchedulingOpportunity(priority); - continue; - } - - // Did we schedule or reserve a container? - Resource assigned = assignment.getResource(); - if (Resources.greaterThan(rc, clusterResource, - assigned, Resources.none())) { - // Don't reset scheduling opportunities for offswitch assignments - // otherwise the app will be delayed for each non-local assignment. - // This helps apps with many off-cluster requests schedule faster. - if (assignment.getType() != NodeType.OFF_SWITCH) { - if (LOG.isDebugEnabled()) { - LOG.debug("Resetting scheduling opportunities"); - } - resetSchedulingOpportunities(priority); - } - // Non-exclusive scheduling opportunity is different: we need reset - // it every time to make sure non-labeled resource request will be - // most likely allocated on non-labeled nodes first. - resetMissedNonPartitionedRequestSchedulingOpportunity(priority); - - // Done - return assignment; - } else { - // Do not assign out of order w.r.t priorities - return SKIP_ASSIGNMENT; - } + return getCSAssignmentFromAllocateResult(clusterResource, + allocationResult); } } - return SKIP_ASSIGNMENT; + // We will reach here if we skipped all priorities of the app, so we will + // skip the app. + return CSAssignment.SKIP_ASSIGNMENT; } public synchronized CSAssignment assignReservedContainer( FiCaSchedulerNode node, RMContainer rmContainer, Resource clusterResource, SchedulingMode schedulingMode) { - // Do we still need this reservation? - Priority priority = rmContainer.getReservedPriority(); - if (getTotalRequiredResources(priority) == 0) { - // Release - return new CSAssignment(this, rmContainer); - } + ContainerAllocation result = + containerAllocator.allocate(clusterResource, node, + schedulingMode, new ResourceLimits(Resources.none()), + rmContainer.getReservedPriority(), rmContainer); - // Try to assign if we have sufficient resources - CSAssignment tmp = - assignContainersOnNode(clusterResource, node, priority, - rmContainer, schedulingMode, new ResourceLimits(Resources.none())); - - // Doesn't matter... since it's already charged for at time of reservation - // "re-reservation" is *free* - CSAssignment ret = new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); - if (tmp.getAssignmentInformation().getNumAllocations() > 0) { - ret.setFulfilledReservation(true); - } - return ret; + return getCSAssignmentFromAllocateResult(clusterResource, result); } - }