Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 94094200BBE for ; Fri, 11 Nov 2016 19:57:30 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 92A98160B15; Fri, 11 Nov 2016 18:57:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CB48C160B14 for ; Fri, 11 Nov 2016 19:57:28 +0100 (CET) Received: (qmail 3921 invoked by uid 500); 11 Nov 2016 18:57:24 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 3237 invoked by uid 99); 11 Nov 2016 18:57:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Nov 2016 18:57:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 52ADAF175A; Fri, 11 Nov 2016 18:57:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: aengineer@apache.org To: common-commits@hadoop.apache.org Date: Fri, 11 Nov 2016 18:57:35 -0000 Message-Id: <487fc5e828a440d9a0a3e85fc0bfd3d5@git.apache.org> In-Reply-To: <52f720d75a2f444ca97fd872b173e652@git.apache.org> References: <52f720d75a2f444ca97fd872b173e652@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/50] [abbrv] hadoop git commit: YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan archived-at: Fri, 11 Nov 2016 18:57:30 -0000 YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/de3b4aac Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/de3b4aac Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/de3b4aac Branch: refs/heads/HDFS-7240 Commit: de3b4aac561258ad242a3c5ed1c919428893fd4c Parents: acd509d Author: Jian He Authored: Mon Nov 7 10:14:39 2016 -0800 Committer: Jian He Committed: Mon Nov 7 10:14:39 2016 -0800 ---------------------------------------------------------------------- .../dev-support/findbugs-exclude.xml | 9 + .../rmcontainer/RMContainer.java | 13 + .../rmcontainer/RMContainerImpl.java | 78 +- .../scheduler/AppSchedulingInfo.java | 168 +++- .../scheduler/SchedulerApplicationAttempt.java | 73 +- .../scheduler/activities/ActivitiesLogger.java | 17 +- .../scheduler/activities/ActivitiesManager.java | 7 +- .../scheduler/capacity/AbstractCSQueue.java | 71 ++ .../scheduler/capacity/CSAssignment.java | 33 + .../scheduler/capacity/CSQueue.java | 19 +- .../scheduler/capacity/CapacityScheduler.java | 773 ++++++++++++++----- .../CapacitySchedulerConfiguration.java | 4 + .../scheduler/capacity/LeafQueue.java | 451 ++++++----- .../scheduler/capacity/ParentQueue.java | 428 +++++----- .../allocator/AbstractContainerAllocator.java | 39 +- .../capacity/allocator/ContainerAllocation.java | 12 +- .../capacity/allocator/ContainerAllocator.java | 15 +- .../allocator/IncreaseContainerAllocator.java | 89 +-- .../allocator/RegularContainerAllocator.java | 215 +++--- .../scheduler/common/AssignmentInformation.java | 44 +- .../common/ContainerAllocationProposal.java | 111 +++ .../common/ResourceAllocationCommitter.java | 29 + .../scheduler/common/ResourceCommitRequest.java | 164 ++++ .../scheduler/common/SchedulerContainer.java | 80 ++ .../scheduler/common/fica/FiCaSchedulerApp.java | 624 ++++++++++++--- .../scheduler/fifo/FifoAppAttempt.java | 110 +++ .../scheduler/fifo/FifoScheduler.java | 55 +- .../scheduler/placement/PlacementSet.java | 65 ++ .../scheduler/placement/PlacementSetUtils.java | 36 + .../placement/ResourceRequestUpdateResult.java | 43 ++ .../placement/SchedulingPlacementSet.java | 90 +++ .../scheduler/placement/SimplePlacementSet.java | 70 ++ .../AbstractComparatorOrderingPolicy.java | 4 +- .../scheduler/policy/FairOrderingPolicy.java | 3 +- .../scheduler/policy/FifoOrderingPolicy.java | 4 +- .../FifoOrderingPolicyForPendingApps.java | 3 +- .../yarn/server/resourcemanager/MockRM.java | 47 +- .../resourcemanager/TestClientRMService.java | 2 +- .../scheduler/TestSchedulerHealth.java | 6 +- .../capacity/TestCapacityScheduler.java | 56 +- .../TestCapacitySchedulerAsyncScheduling.java | 143 ++++ .../scheduler/capacity/TestChildQueueOrder.java | 21 +- .../capacity/TestContainerAllocation.java | 45 +- .../capacity/TestContainerResizing.java | 10 +- .../scheduler/capacity/TestLeafQueue.java | 647 +++++++++++----- .../scheduler/capacity/TestParentQueue.java | 209 +++-- .../scheduler/capacity/TestReservations.java | 277 +++++-- .../scheduler/capacity/TestUtils.java | 26 + .../TestRMWebServicesSchedulerActivities.java | 8 +- 49 files changed, 4212 insertions(+), 1334 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 01b1da7..ab36a4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -574,4 +574,13 @@      + + + + + + + + + http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index e5d1208..a244ad8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -46,6 +47,8 @@ public interface RMContainer extends EventHandler { ContainerId getContainerId(); + void setContainerId(ContainerId containerId); + ApplicationAttemptId getApplicationAttemptId(); RMContainerState getState(); @@ -105,4 +108,14 @@ public interface RMContainer extends EventHandler { * @return If the container was allocated remotely. */ boolean isRemotelyAllocated(); + + /* + * Return reserved resource for reserved containers, return allocated resource + * for other container + */ + Resource getAllocatedOrReservedResource(); + + boolean completed(); + + NodeId getNodeId(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 706821e..4294ef0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -161,7 +161,6 @@ public class RMContainerImpl implements RMContainer, Comparable { RMContainerEvent> stateMachine; private final ReadLock readLock; private final WriteLock writeLock; - private final ContainerId containerId; private final ApplicationAttemptId appAttemptId; private final NodeId nodeId; private final Container container; @@ -224,7 +223,6 @@ public class RMContainerImpl implements RMContainer, Comparable { RMContext rmContext, long creationTime, String nodeLabelExpression, boolean isExternallyAllocated) { this.stateMachine = stateMachineFactory.make(this); - this.containerId = container.getId(); this.nodeId = nodeId; this.container = container; this.allocatedSchedulerKey = SchedulerRequestKey.extractFrom(container); @@ -255,7 +253,7 @@ public class RMContainerImpl implements RMContainer, Comparable { // containers. If false, and if this container is marked as the AM, metrics // will still be published for this container, but that calculation happens // later. - if (saveNonAMContainerMetaInfo) { + if (saveNonAMContainerMetaInfo && null != container.getId()) { rmContext.getSystemMetricsPublisher().containerCreated( this, this.creationTime); } @@ -263,7 +261,7 @@ public class RMContainerImpl implements RMContainer, Comparable { @Override public ContainerId getContainerId() { - return this.containerId; + return this.container.getId(); } @Override @@ -356,8 +354,8 @@ public class RMContainerImpl implements RMContainer, Comparable { public String getDiagnosticsInfo() { try { readLock.lock(); - if (getFinishedStatus() != null) { - return getFinishedStatus().getDiagnostics(); + if (finishedStatus != null) { + return finishedStatus.getDiagnostics(); } else { return null; } @@ -374,7 +372,7 @@ public class RMContainerImpl implements RMContainer, Comparable { logURL.append(WebAppUtils.getHttpSchemePrefix(rmContext .getYarnConfiguration())); logURL.append(WebAppUtils.getRunningLogURL( - container.getNodeHttpAddress(), containerId.toString(), + container.getNodeHttpAddress(), getContainerId().toString(), user)); return logURL.toString(); } finally { @@ -386,8 +384,8 @@ public class RMContainerImpl implements RMContainer, Comparable { public int getContainerExitStatus() { try { readLock.lock(); - if (getFinishedStatus() != null) { - return getFinishedStatus().getExitStatus(); + if (finishedStatus != null) { + return finishedStatus.getExitStatus(); } else { return 0; } @@ -400,8 +398,8 @@ public class RMContainerImpl implements RMContainer, Comparable { public ContainerState getContainerState() { try { readLock.lock(); - if (getFinishedStatus() != null) { - return getFinishedStatus().getState(); + if (finishedStatus != null) { + return finishedStatus.getState(); } else { return ContainerState.RUNNING; } @@ -431,7 +429,7 @@ public class RMContainerImpl implements RMContainer, Comparable { @Override public String toString() { - return containerId.toString(); + return getContainerId().toString(); } @Override @@ -476,7 +474,7 @@ public class RMContainerImpl implements RMContainer, Comparable { } catch (InvalidStateTransitionException e) { LOG.error("Can't handle this event at current state", e); LOG.error("Invalid event " + event.getType() + - " on container " + this.containerId); + " on container " + this.getContainerId()); } if (oldState != getState()) { LOG.info(event.getContainerId() + " Container Transitioned from " @@ -489,10 +487,15 @@ public class RMContainerImpl implements RMContainer, Comparable { } } - public ContainerStatus getFinishedStatus() { - return finishedStatus; + public boolean completed() { + return finishedStatus != null; } - + + @Override + public NodeId getNodeId() { + return nodeId; + } + private static class BaseTransition implements SingleArcTransition { @@ -517,7 +520,7 @@ public class RMContainerImpl implements RMContainer, Comparable { report.getContainerExitStatus()); new FinishedTransition().transition(container, - new RMContainerFinishedEvent(container.containerId, status, + new RMContainerFinishedEvent(container.getContainerId(), status, RMContainerEventType.FINISHED)); return RMContainerState.COMPLETED; } else if (report.getContainerState().equals(ContainerState.RUNNING)) { @@ -654,11 +657,11 @@ public class RMContainerImpl implements RMContainer, Comparable { } else { // Something wrong happened, kill the container LOG.warn("Something wrong happened, container size reported by NM" - + " is not expected, ContainerID=" + container.containerId + + " is not expected, ContainerID=" + container.getContainerId() + " rm-size-resource:" + rmContainerResource + " nm-size-reosurce:" + nmContainerResource); container.eventHandler.handle(new RMNodeCleanContainerEvent( - container.nodeId, container.containerId)); + container.nodeId, container.getContainerId())); } } @@ -761,7 +764,7 @@ public class RMContainerImpl implements RMContainer, Comparable { // Inform node container.eventHandler.handle(new RMNodeCleanContainerEvent( - container.nodeId, container.containerId)); + container.nodeId, container.getContainerId())); // Inform appAttempt super.transition(container, event); @@ -831,8 +834,8 @@ public class RMContainerImpl implements RMContainer, Comparable { @Override public int compareTo(RMContainer o) { - if (containerId != null && o.getContainerId() != null) { - return containerId.compareTo(o.getContainerId()); + if (getContainerId() != null && o.getContainerId() != null) { + return getContainerId().compareTo(o.getContainerId()); } return -1; } @@ -865,4 +868,35 @@ public class RMContainerImpl implements RMContainer, Comparable { public boolean isRemotelyAllocated() { return isExternallyAllocated; } + + @Override + public Resource getAllocatedOrReservedResource() { + try { + readLock.lock(); + if (getState().equals(RMContainerState.RESERVED)) { + return getReservedResource(); + } else { + return getAllocatedResource(); + } + } finally { + readLock.unlock(); + } + } + + @Override + public void setContainerId(ContainerId containerId) { + // In some cases, for example, global scheduling. It is possible that + // container created without container-id assigned, so we will publish + // container creation event to timeline service when id assigned. + container.setId(containerId); + + // If saveNonAMContainerMetaInfo is true, store system metrics for all + // containers. If false, and if this container is marked as the AM, metrics + // will still be published for this container, but that calculation happens + // later. + if (saveNonAMContainerMetaInfo && null != container.getId()) { + rmContext.getSystemMetricsPublisher().containerCreated( + this, this.creationTime); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 59a6650..ffb1885 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.commons.collections.IteratorUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -49,6 +51,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -691,6 +696,25 @@ public class AppSchedulingInfo { } } + public List allocate(NodeType type, + SchedulerNode node, SchedulerRequestKey schedulerKey, + Container containerAllocated) { + try { + writeLock.lock(); + ResourceRequest request; + if (type == NodeType.NODE_LOCAL) { + request = resourceRequestMap.get(schedulerKey).get(node.getNodeName()); + } else if (type == NodeType.RACK_LOCAL) { + request = resourceRequestMap.get(schedulerKey).get(node.getRackName()); + } else{ + request = resourceRequestMap.get(schedulerKey).get(ResourceRequest.ANY); + } + return allocate(type, node, schedulerKey, request, containerAllocated); + } finally { + writeLock.unlock(); + } + } + /** * Resources have been allocated to this application by the resource * scheduler. Track them. @@ -701,40 +725,26 @@ public class AppSchedulingInfo { * @param containerAllocated Container Allocated * @return List of ResourceRequests */ - public List allocate(NodeType type, SchedulerNode node, - SchedulerRequestKey schedulerKey, ResourceRequest request, - Container containerAllocated) { - List resourceRequests = new ArrayList<>(); + public List allocate(NodeType type, + SchedulerNode node, SchedulerRequestKey schedulerKey, + ResourceRequest request, Container containerAllocated) { try { - this.writeLock.lock(); + writeLock.lock(); + List resourceRequests = new ArrayList<>(); if (type == NodeType.NODE_LOCAL) { allocateNodeLocal(node, schedulerKey, request, resourceRequests); } else if (type == NodeType.RACK_LOCAL) { allocateRackLocal(node, schedulerKey, request, resourceRequests); - } else { + } else{ allocateOffSwitch(request, resourceRequests, schedulerKey); } - QueueMetrics metrics = queue.getMetrics(); - if (pending) { - // once an allocation is done we assume the application is - // running from scheduler's POV. - pending = false; - metrics.runAppAttempt(applicationId, user); - } - if (LOG.isDebugEnabled()) { - LOG.debug("allocate: applicationId=" + applicationId - + " container=" + containerAllocated.getId() - + " host=" + containerAllocated.getNodeId().toString() - + " user=" + user - + " resource=" + request.getCapability() - + " type=" + type); + if (null != containerAllocated) { + updateMetricsForAllocatedContainer(request, type, containerAllocated); } - metrics.allocateResources(user, 1, request.getCapability(), true); - metrics.incrNodeTypeAggregations(user, type); return resourceRequests; } finally { - this.writeLock.unlock(); + writeLock.unlock(); } } @@ -942,4 +952,116 @@ public class AppSchedulingInfo { request.getRelaxLocality(), request.getNodeLabelExpression()); return newRequest; } + + /* + * In async environment, pending resource request could be updated during + * scheduling, this method checks pending request before allocating + */ + public boolean checkAllocation(NodeType type, SchedulerNode node, + SchedulerRequestKey schedulerKey) { + try { + readLock.lock(); + ResourceRequest r = resourceRequestMap.get(schedulerKey).get( + ResourceRequest.ANY); + if (r == null || r.getNumContainers() <= 0) { + return false; + } + if (type == NodeType.RACK_LOCAL || type == NodeType.NODE_LOCAL) { + r = resourceRequestMap.get(schedulerKey).get(node.getRackName()); + if (r == null || r.getNumContainers() <= 0) { + return false; + } + if (type == NodeType.NODE_LOCAL) { + r = resourceRequestMap.get(schedulerKey).get(node.getNodeName()); + if (r == null || r.getNumContainers() <= 0) { + return false; + } + } + } + + return true; + } finally { + readLock.unlock(); + } + } + + public void updateMetricsForAllocatedContainer( + ResourceRequest request, NodeType type, Container containerAllocated) { + try { + writeLock.lock(); + QueueMetrics metrics = queue.getMetrics(); + if (pending) { + // once an allocation is done we assume the application is + // running from scheduler's POV. + pending = false; + metrics.runAppAttempt(applicationId, user); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("allocate: applicationId=" + applicationId + " container=" + + containerAllocated.getId() + " host=" + containerAllocated + .getNodeId().toString() + " user=" + user + " resource=" + request + .getCapability() + " type=" + type); + } + metrics.allocateResources(user, 1, request.getCapability(), true); + metrics.incrNodeTypeAggregations(user, type); + } finally { + writeLock.unlock(); + } + } + + // Get placement-set by specified schedulerKey + // Now simply return all node of the input clusterPlacementSet + // TODO, need update this when we support global scheduling + public SchedulingPlacementSet getSchedulingPlacementSet( + SchedulerRequestKey schedulerkey) { + return new SchedulingPlacementSet() { + @Override + @SuppressWarnings("unchecked") + public Iterator getPreferredNodeIterator( + PlacementSet clusterPlacementSet) { + return IteratorUtils.singletonIterator( + clusterPlacementSet.getAllNodes().values().iterator().next()); + } + + @Override + public ResourceRequestUpdateResult updateResourceRequests( + List requests, + boolean recoverPreemptedRequestForAContainer) { + return null; + } + + @Override + public Map getResourceRequests() { + return null; + } + + @Override + public ResourceRequest getResourceRequest(String resourceName, + SchedulerRequestKey requestKey) { + return null; + } + + @Override + public List allocate(NodeType type, SchedulerNode node, + ResourceRequest request) { + return null; + } + + @Override + public Map getAllNodes() { + return null; + } + + @Override + public long getVersion() { + return 0; + } + + @Override + public String getPartition() { + return null; + } + }; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index d148132..bb1d461 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -28,10 +28,12 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.collect.ConcurrentHashMultiset; +import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateUtils; import org.apache.commons.lang.time.FastDateFormat; import org.apache.commons.logging.Log; @@ -49,12 +51,14 @@ import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -69,6 +73,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; @@ -178,6 +185,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { protected ReentrantReadWriteLock.ReadLock readLock; protected ReentrantReadWriteLock.WriteLock writeLock; + // Not confirmed allocation resource, will be used to avoid too many proposal + // rejected because of duplicated allocation + private AtomicLong unconfirmedAllocatedMem = new AtomicLong(); + private AtomicInteger unconfirmedAllocatedVcores = new AtomicInteger(); + public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { @@ -529,6 +541,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { if (rmContainer == null) { rmContainer = new RMContainerImpl(container, getApplicationAttemptId(), node.getNodeID(), appSchedulingInfo.getUser(), rmContext); + } + if (rmContainer.getState() == RMContainerState.NEW) { attemptResourceUsage.incReserved(node.getPartition(), container.getResource()); ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName()); @@ -839,16 +853,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { } // used for continuous scheduling - public void resetSchedulingOpportunities( - SchedulerRequestKey schedulerKey, long currentTimeMs) { - try { - writeLock.lock(); - lastScheduledContainer.put(schedulerKey, currentTimeMs); - schedulingOpportunities.setCount(schedulerKey, 0); - } finally { - writeLock.unlock(); - } - + public void resetSchedulingOpportunities(SchedulerRequestKey schedulerKey, + long currentTimeMs) { + lastScheduledContainer.put(schedulerKey, currentTimeMs); + schedulingOpportunities.setCount(schedulerKey, 0); } @VisibleForTesting @@ -998,6 +1006,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { public void incNumAllocatedContainers(NodeType containerType, NodeType requestType) { + if (containerType == null || requestType == null) { + // Sanity check + return; + } + RMAppAttempt attempt = rmContext.getRMApps().get(attemptId.getApplicationId()) .getCurrentAppAttempt(); @@ -1039,9 +1052,27 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { public boolean hasPendingResourceRequest(ResourceCalculator rc, String nodePartition, Resource cluster, SchedulingMode schedulingMode) { - return SchedulerUtils.hasPendingResourceRequest(rc, - this.attemptResourceUsage, nodePartition, cluster, - schedulingMode); + // We need to consider unconfirmed allocations + if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { + nodePartition = RMNodeLabelsManager.NO_LABEL; + } + + Resource pending = attemptResourceUsage.getPending(nodePartition); + + // TODO, need consider node partition here + // To avoid too many allocation-proposals rejected for non-default + // partition allocation + if (StringUtils.equals(nodePartition, RMNodeLabelsManager.NO_LABEL)) { + pending = Resources.subtract(pending, Resources + .createResource(unconfirmedAllocatedMem.get(), + unconfirmedAllocatedVcores.get())); + } + + if (Resources.greaterThan(rc, cluster, pending, Resources.none())) { + return true; + } + + return false; } @VisibleForTesting @@ -1206,6 +1237,22 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { this.isAttemptRecovering = isRecovering; } + public SchedulingPlacementSet getSchedulingPlacementSet( + SchedulerRequestKey schedulerRequestKey) { + return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey); + } + + + public void incUnconfirmedRes(Resource res) { + unconfirmedAllocatedMem.addAndGet(res.getMemorySize()); + unconfirmedAllocatedVcores.addAndGet(res.getVirtualCores()); + } + + public void decUnconfirmedRes(Resource res) { + unconfirmedAllocatedMem.addAndGet(-res.getMemorySize()); + unconfirmedAllocatedVcores.addAndGet(-res.getVirtualCores()); + } + /** * Different state for Application Master, user can see this state from web UI */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java index 8fa1bb5..3f8ed55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java @@ -25,12 +25,14 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; /** * Utility for logging scheduler activities */ +// FIXME: make sure PlacementSet works with this class public class ActivitiesLogger { private static final Log LOG = LogFactory.getLog(ActivitiesLogger.class); @@ -112,7 +114,7 @@ public class ActivitiesLogger { */ public static void recordAppActivityWithAllocation( ActivitiesManager activitiesManager, SchedulerNode node, - SchedulerApplicationAttempt application, Container updatedContainer, + SchedulerApplicationAttempt application, RMContainer updatedContainer, ActivityState activityState) { if (activitiesManager == null) { return; @@ -122,9 +124,9 @@ public class ActivitiesLogger { // Add application-container activity into specific node allocation. activitiesManager.addSchedulingActivityForNode(node.getNodeID(), application.getApplicationId().toString(), - updatedContainer.getId().toString(), - updatedContainer.getPriority().toString(), activityState, - ActivityDiagnosticConstant.EMPTY, type); + updatedContainer.getContainer().toString(), + updatedContainer.getContainer().getPriority().toString(), + activityState, ActivityDiagnosticConstant.EMPTY, type); type = "app"; // Add queue-application activity into specific node allocation. activitiesManager.addSchedulingActivityForNode(node.getNodeID(), @@ -138,9 +140,10 @@ public class ActivitiesLogger { application.getApplicationId())) { String type = "container"; activitiesManager.addSchedulingActivityForApp( - application.getApplicationId(), updatedContainer.getId().toString(), - updatedContainer.getPriority().toString(), activityState, - ActivityDiagnosticConstant.EMPTY, type); + application.getApplicationId(), + updatedContainer.getContainerId(), + updatedContainer.getContainer().getPriority().toString(), + activityState, ActivityDiagnosticConstant.EMPTY, type); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java index 4fa5feb..af73ae3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java @@ -200,12 +200,13 @@ public class ActivitiesManager extends AbstractService { // Add queue, application or container activity into specific application // allocation. void addSchedulingActivityForApp(ApplicationId applicationId, - String containerId, String priority, ActivityState state, + ContainerId containerId, String priority, ActivityState state, String diagnostic, String type) { if (shouldRecordThisApp(applicationId)) { AppAllocation appAllocation = appsAllocation.get(applicationId); - appAllocation.addAppAllocationActivity(containerId, priority, state, - diagnostic, type); + appAllocation.addAppAllocationActivity(containerId == null ? + "Container-Id-Not-Assigned" : + containerId.toString(), priority, state, diagnostic, type); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 096f5ea..7e18b29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,6 +55,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -738,4 +745,68 @@ public abstract class AbstractCSQueue implements CSQueue { return csContext.getPreemptionManager().getKillableContainers(queueName, partition); } + + @VisibleForTesting + @Override + public CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, ResourceLimits resourceLimits, + SchedulingMode schedulingMode) { + return assignContainers(clusterResource, new SimplePlacementSet<>(node), + resourceLimits, schedulingMode); + } + + @Override + public boolean accept(Resource cluster, + ResourceCommitRequest request) { + // Do we need to check parent queue before making this decision? + boolean checkParentQueue = false; + + ContainerAllocationProposal allocation = + request.getFirstAllocatedOrReservedContainer(); + SchedulerContainer schedulerContainer = + allocation.getAllocatedOrReservedContainer(); + + // Do not check when allocating new container from a reserved container + if (allocation.getAllocateFromReservedContainer() == null) { + Resource required = allocation.getAllocatedOrReservedResource(); + Resource netAllocated = Resources.subtract(required, + request.getTotalReleasedResource()); + + try { + readLock.lock(); + + String partition = schedulerContainer.getNodePartition(); + Resource maxResourceLimit; + if (allocation.getSchedulingMode() + == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { + maxResourceLimit = getQueueMaxResource(partition, cluster); + } else{ + maxResourceLimit = labelManager.getResourceByLabel( + schedulerContainer.getNodePartition(), cluster); + } + if (!Resources.fitsIn(resourceCalculator, cluster, + Resources.add(queueUsage.getUsed(partition), netAllocated), + maxResourceLimit)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Used resource=" + queueUsage.getUsed(partition) + + " exceeded maxResourceLimit of the queue =" + + maxResourceLimit); + } + return false; + } + } finally { + readLock.unlock(); + } + + // Only check parent queue when something new allocated or reserved. + checkParentQueue = true; + } + + + if (parent != null && checkParentQueue) { + return parent.accept(cluster, request); + } + + return true; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java index 7bea9af..2cae9a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java @@ -38,7 +38,11 @@ public class CSAssignment { new CSAssignment(SkippedType.OTHER); private Resource resource; + // Container allocation locality type private NodeType type; + + // Pending request locality type + private NodeType requestLocalityType; private RMContainer excessReservation; private FiCaSchedulerApp application; private SkippedType skipped; @@ -57,6 +61,10 @@ public class CSAssignment { private boolean increaseAllocation; private List containersToKill; + // Set when fulfilledReservation = true + private RMContainer fulfilledReservedContainer; + private SchedulingMode schedulingMode; + public CSAssignment(Resource resource, NodeType type) { this(resource, type, null, null, SkippedType.NONE, false); } @@ -173,4 +181,29 @@ public class CSAssignment { public List getContainersToKill() { return containersToKill; } + + public RMContainer getFulfilledReservedContainer() { + return fulfilledReservedContainer; + } + + public void setFulfilledReservedContainer( + RMContainer fulfilledReservedContainer) { + this.fulfilledReservedContainer = fulfilledReservedContainer; + } + + public SchedulingMode getSchedulingMode() { + return schedulingMode; + } + + public void setSchedulingMode(SchedulingMode schedulingMode) { + this.schedulingMode = schedulingMode; + } + + public NodeType getRequestLocalityType() { + return requestLocalityType; + } + + public void setRequestLocalityType(NodeType requestLocalityType) { + this.requestLocalityType = requestLocalityType; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index daf7790..e5cbd04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.List; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.security.AccessControlException; @@ -42,8 +43,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; /** * CSQueue represents a node in the tree of @@ -195,14 +199,14 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { /** * Assign containers to applications in the queue or it's children (if any). * @param clusterResource the resource of the cluster. - * @param node node on which resources are available + * @param ps {@link PlacementSet} of nodes which resources are available * @param resourceLimits how much overall resource of this queue can use. * @param schedulingMode Type of exclusive check when assign container on a * NodeManager, see {@link SchedulingMode}. * @return the assignment */ public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, ResourceLimits resourceLimits, + PlacementSet ps, ResourceLimits resourceLimits, SchedulingMode schedulingMode); /** @@ -340,4 +344,15 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { * @return valid node labels */ public Set getNodeLabelsForQueue(); + + @VisibleForTesting + CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, ResourceLimits resourceLimits, + SchedulingMode schedulingMode); + + boolean accept(Resource cluster, + ResourceCommitRequest request); + + void apply(Resource cluster, + ResourceCommitRequest request); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org