Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E7CBE200BC8 for ; Tue, 8 Nov 2016 23:46:16 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E6706160B14; Tue, 8 Nov 2016 22:46:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E862D160B13 for ; Tue, 8 Nov 2016 23:46:14 +0100 (CET) Received: (qmail 86223 invoked by uid 500); 8 Nov 2016 22:46:08 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 83881 invoked by uid 99); 8 Nov 2016 22:46:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Nov 2016 22:46:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B198CF1732; Tue, 8 Nov 2016 22:46:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: subru@apache.org To: common-commits@hadoop.apache.org Date: Tue, 08 Nov 2016 22:46:23 -0000 Message-Id: <1564f9a4672e477d9f07f103c1a22117@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [18/50] [abbrv] hadoop git commit: YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan archived-at: Tue, 08 Nov 2016 22:46:17 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index d759d47..7e98f10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -32,7 +32,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.StringUtils; @@ -112,7 +114,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Alloca import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceAllocationCommitter; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -128,6 +134,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourc import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -142,11 +151,12 @@ import com.google.common.base.Preconditions; @SuppressWarnings("unchecked") public class CapacityScheduler extends AbstractYarnScheduler implements - PreemptableResourceScheduler, CapacitySchedulerContext, Configurable { + PreemptableResourceScheduler, CapacitySchedulerContext, Configurable, + ResourceAllocationCommitter { private static final Log LOG = LogFactory.getLog(CapacityScheduler.class); private YarnAuthorizationProvider authorizer; - + private CSQueue root; // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; @@ -155,6 +165,8 @@ public class CapacityScheduler extends private volatile boolean isLazyPreemptionEnabled = false; + private int offswitchPerHeartbeatLimit; + static final Comparator nonPartitionedQueueComparator = new Comparator() { @Override @@ -176,7 +188,7 @@ public class CapacityScheduler extends public void setConf(Configuration conf) { yarnConf = conf; } - + private void validateConf(Configuration conf) { // validate scheduler memory allocation setting int minMem = conf.getInt( @@ -229,7 +241,8 @@ public class CapacityScheduler extends private boolean usePortForNodeName; private boolean scheduleAsynchronously; - private AsyncScheduleThread asyncSchedulerThread; + private List asyncSchedulerThreads; + private ResourceCommitterService resourceCommitterService; private RMNodeLabelsManager labelManager; /** @@ -253,7 +266,7 @@ public class CapacityScheduler extends public CSQueue getRootQueue() { return root; } - + @Override public CapacitySchedulerConfiguration getConfiguration() { return conf; @@ -269,11 +282,16 @@ public class CapacityScheduler extends return calculator; } + @VisibleForTesting + public void setResourceCalculator(ResourceCalculator rc) { + this.calculator = rc; + } + @Override public Comparator getNonPartitionedQueueComparator() { return nonPartitionedQueueComparator; } - + @Override public PartitionedQueueComparator getPartitionedQueueComparator() { return partitionedQueueComparator; @@ -294,7 +312,8 @@ public class CapacityScheduler extends this.rmContext = rmContext; } - private void initScheduler(Configuration configuration) throws + @VisibleForTesting + void initScheduler(Configuration configuration) throws IOException { try { writeLock.lock(); @@ -315,10 +334,24 @@ public class CapacityScheduler extends scheduleAsynchronously = this.conf.getScheduleAynschronously(); asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, DEFAULT_ASYNC_SCHEDULER_INTERVAL); + + // number of threads for async scheduling + int maxAsyncSchedulingThreads = this.conf.getInt( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, + 1); + maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1); + if (scheduleAsynchronously) { - asyncSchedulerThread = new AsyncScheduleThread(this); + asyncSchedulerThreads = new ArrayList<>(); + for (int i = 0; i < maxAsyncSchedulingThreads; i++) { + asyncSchedulerThreads.add(new AsyncScheduleThread(this)); + } + resourceCommitterService = new ResourceCommitterService(this); } + // Setup how many containers we can allocate for each round + offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); + LOG.info("Initialized CapacityScheduler with " + "calculator=" + getResourceCalculator().getClass() + ", " + "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + "maximumAllocation=<" @@ -335,9 +368,13 @@ public class CapacityScheduler extends writeLock.lock(); activitiesManager.start(); if (scheduleAsynchronously) { - Preconditions.checkNotNull(asyncSchedulerThread, - "asyncSchedulerThread is null"); - asyncSchedulerThread.start(); + Preconditions.checkNotNull(asyncSchedulerThreads, + "asyncSchedulerThreads is null"); + for (Thread t : asyncSchedulerThreads) { + t.start(); + } + + resourceCommitterService.start(); } } finally { writeLock.unlock(); @@ -361,9 +398,13 @@ public class CapacityScheduler extends public void serviceStop() throws Exception { try { writeLock.lock(); - if (scheduleAsynchronously && asyncSchedulerThread != null) { - asyncSchedulerThread.interrupt(); - asyncSchedulerThread.join(THREAD_JOIN_TIMEOUT_MS); + if (scheduleAsynchronously && asyncSchedulerThreads != null) { + for (Thread t : asyncSchedulerThreads) { + t.interrupt(); + t.join(THREAD_JOIN_TIMEOUT_MS); + } + resourceCommitterService.interrupt(); + resourceCommitterService.join(THREAD_JOIN_TIMEOUT_MS); } } finally { writeLock.unlock(); @@ -393,17 +434,20 @@ public class CapacityScheduler extends // update lazy preemption this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled(); + + // Setup how many containers we can allocate for each round + offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); } finally { writeLock.unlock(); } } - + long getAsyncScheduleInterval() { return asyncScheduleInterval; } private final static Random random = new Random(System.currentTimeMillis()); - + /** * Schedule on all nodes by starting at a random point. * @param cs @@ -413,20 +457,22 @@ public class CapacityScheduler extends int current = 0; Collection nodes = cs.nodeTracker.getAllNodes(); int start = random.nextInt(nodes.size()); + for (FiCaSchedulerNode node : nodes) { if (current++ >= start) { - cs.allocateContainersToNode(node); + cs.allocateContainersToNode(node.getNodeID(), false); } } // Now, just get everyone to be safe for (FiCaSchedulerNode node : nodes) { - cs.allocateContainersToNode(node); + cs.allocateContainersToNode(node.getNodeID(), false); } + try { Thread.sleep(cs.getAsyncScheduleInterval()); } catch (InterruptedException e) {} } - + static class AsyncScheduleThread extends Thread { private final CapacityScheduler cs; @@ -440,12 +486,19 @@ public class CapacityScheduler extends @Override public void run() { while (true) { - if (!runSchedules.get()) { - try { + try { + if (!runSchedules.get() || Thread.currentThread().isInterrupted()) { Thread.sleep(100); - } catch (InterruptedException ie) {} - } else { - schedule(cs); + } else { + // Don't run schedule if we have some pending backlogs already + if (cs.getAsyncSchedulingPendingBacklogs() > 100) { + Thread.sleep(1); + } else{ + schedule(cs); + } + } + } catch (InterruptedException ie) { + // Do nothing } } } @@ -460,6 +513,46 @@ public class CapacityScheduler extends } + static class ResourceCommitterService extends Thread { + private final CapacityScheduler cs; + private BlockingQueue> + backlogs = new LinkedBlockingQueue<>(); + + public ResourceCommitterService(CapacityScheduler cs) { + this.cs = cs; + setDaemon(true); + } + + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + try { + ResourceCommitRequest request = + backlogs.take(); + + try { + cs.writeLock.lock(); + cs.tryCommit(cs.getClusterResource(), request); + } finally { + cs.writeLock.unlock(); + } + + } catch (InterruptedException e) { + LOG.error(e); + } + } + } + + public void addNewCommitRequest( + ResourceCommitRequest proposal) { + backlogs.add(proposal); + } + + public int getPendingBacklogs() { + return backlogs.size(); + } + } + static class QueueHook { public CSQueue hook(CSQueue queue) { return queue; @@ -507,14 +600,14 @@ public class CapacityScheduler extends private void updatePlacementRules() throws IOException { List placementRules = new ArrayList<>(); - + // Initialize UserGroupMappingPlacementRule // TODO, need make this defineable by configuration. UserGroupMappingPlacementRule ugRule = getUserGroupMappingPlacementRule(); if (null != ugRule) { placementRules.add(ugRule); } - + rmContext.getQueuePlacementManager().updateRules(placementRules); } @@ -522,8 +615,8 @@ public class CapacityScheduler extends private void initializeQueues(CapacitySchedulerConfiguration conf) throws IOException { - root = - parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, + root = + parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, noop); labelManager.reinitializeQueueLabels(getQueueToLabels()); LOG.info("Initialized root queue " + root); @@ -539,16 +632,16 @@ public class CapacityScheduler extends throws IOException { // Parse new queues Map newQueues = new HashMap(); - CSQueue newRoot = + CSQueue newRoot = parseQueue(this, newConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues, noop); - + // Ensure all existing queues are still present validateExistingQueues(queues, newQueues); // Add new queues addNewQueues(queues, newQueues); - + // Re-configure queues root.reinitialize(newRoot, getClusterResource()); updatePlacementRules(); @@ -592,14 +685,14 @@ public class CapacityScheduler extends */ @Lock(CapacityScheduler.class) private void validateExistingQueues( - Map queues, Map newQueues) + Map queues, Map newQueues) throws IOException { // check that all static queues are included in the newQueues list for (Map.Entry e : queues.entrySet()) { if (!(e.getValue() instanceof ReservationQueue)) { String queueName = e.getKey(); CSQueue oldQueue = e.getValue(); - CSQueue newQueue = newQueues.get(queueName); + CSQueue newQueue = newQueues.get(queueName); if (null == newQueue) { throw new IOException(queueName + " cannot be found during refresh!"); } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) { @@ -619,7 +712,7 @@ public class CapacityScheduler extends */ @Lock(CapacityScheduler.class) private void addNewQueues( - Map queues, Map newQueues) + Map queues, Map newQueues) { for (Map.Entry e : newQueues.entrySet()) { String queueName = e.getKey(); @@ -629,19 +722,19 @@ public class CapacityScheduler extends } } } - + @Lock(CapacityScheduler.class) static CSQueue parseQueue( CapacitySchedulerContext csContext, - CapacitySchedulerConfiguration conf, + CapacitySchedulerConfiguration conf, CSQueue parent, String queueName, Map queues, - Map oldQueues, + Map oldQueues, QueueHook hook) throws IOException { CSQueue queue; String fullQueueName = (parent == null) ? queueName : (parent.getQueuePath() + "." + queueName); - String[] childQueueNames = + String[] childQueueNames = conf.getQueues(fullQueueName); boolean isReservableQueue = conf.isReservable(fullQueueName); if (childQueueNames == null || childQueueNames.length == 0) { @@ -676,8 +769,8 @@ public class CapacityScheduler extends List childQueues = new ArrayList(); for (String childQueueName : childQueueNames) { - CSQueue childQueue = - parseQueue(csContext, conf, queue, childQueueName, + CSQueue childQueue = + parseQueue(csContext, conf, queue, childQueueName, queues, oldQueues, hook); childQueues.add(childQueue); } @@ -960,6 +1053,9 @@ public class CapacityScheduler extends private LeafQueue updateIncreaseRequests( List increaseRequests, FiCaSchedulerApp app) { + // When application has some pending to-be-removed resource requests, + app.removedToBeRemovedIncreaseRequests(); + if (null == increaseRequests || increaseRequests.isEmpty()) { return null; } @@ -1068,8 +1164,8 @@ public class CapacityScheduler extends @Override @Lock(Lock.NoLock.class) - public QueueInfo getQueueInfo(String queueName, - boolean includeChildQueues, boolean recursive) + public QueueInfo getQueueInfo(String queueName, + boolean includeChildQueues, boolean recursive) throws IOException { CSQueue queue = null; queue = this.queues.get(queueName); @@ -1094,20 +1190,33 @@ public class CapacityScheduler extends } @Override - protected synchronized void nodeUpdate(RMNode nm) { + protected synchronized void nodeUpdate(RMNode rmNode) { try { - writeLock.lock(); + readLock.lock(); setLastNodeUpdateTime(Time.now()); - super.nodeUpdate(nm); - if (!scheduleAsynchronously) { + super.nodeUpdate(rmNode); + } finally { + readLock.unlock(); + } + + // Try to do scheduling + if (!scheduleAsynchronously) { + try { + writeLock.lock(); ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager, - nm.getNodeID()); - allocateContainersToNode(getNode(nm.getNodeID())); + rmNode.getNodeID()); + + // reset allocation and reservation stats before we start doing any + // work + updateSchedulerHealth(lastNodeUpdateTime, rmNode.getNodeID(), + CSAssignment.NULL_ASSIGNMENT); + + allocateContainersToNode(rmNode.getNodeID(), true); ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager, - nm.getNodeID()); + rmNode.getNodeID()); + } finally { + writeLock.unlock(); } - } finally { - writeLock.unlock(); } } @@ -1174,10 +1283,8 @@ public class CapacityScheduler extends node.updateLabels(newLabels); } - private void updateSchedulerHealth(long now, FiCaSchedulerNode node, + private void updateSchedulerHealth(long now, NodeId nodeId, CSAssignment assignment) { - - NodeId nodeId = node.getNodeID(); List allocations = assignment.getAssignmentInformation().getAllocationDetails(); List reservations = @@ -1203,137 +1310,248 @@ public class CapacityScheduler extends schedulerHealth.updateSchedulerRunDetails(now, assignment .getAssignmentInformation().getAllocated(), assignment .getAssignmentInformation().getReserved()); - } - - @VisibleForTesting - public void allocateContainersToNode(FiCaSchedulerNode node) { - try { - writeLock.lock(); - if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext - .isSchedulerReadyForAllocatingContainers()) { - return; - } + } - if (!nodeTracker.exists(node.getNodeID())) { - LOG.info("Skipping scheduling as the node " + node.getNodeID() - + " has been removed"); - return; + private boolean canAllocateMore(CSAssignment assignment, int offswitchCount) { + if (null != assignment && Resources.greaterThan(getResourceCalculator(), + getClusterResource(), assignment.getResource(), Resources.none()) + && offswitchCount < offswitchPerHeartbeatLimit) { + // And it should not be a reserved container + if (assignment.getAssignmentInformation().getNumReservations() == 0) { + return true; } + } - // reset allocation and reservation stats before we start doing any work - updateSchedulerHealth(lastNodeUpdateTime, node, - new CSAssignment(Resources.none(), NodeType.NODE_LOCAL)); - - CSAssignment assignment; + return false; + } - // Assign new containers... - // 1. Check for reserved applications - // 2. Schedule if there are no reservations + /** + * We need to make sure when doing allocation, Node should be existed + * And we will construct a {@link PlacementSet} before proceeding + */ + private void allocateContainersToNode(NodeId nodeId, + boolean withNodeHeartbeat) { + FiCaSchedulerNode node = getNode(nodeId); + if (null != node) { + int offswitchCount = 0; + + PlacementSet ps = new SimplePlacementSet<>(node); + CSAssignment assignment = allocateContainersToNode(ps, withNodeHeartbeat); + // Only check if we can allocate more container on the same node when + // scheduling is triggered by node heartbeat + if (null != assignment && withNodeHeartbeat) { + if (assignment.getType() == NodeType.OFF_SWITCH) { + offswitchCount++; + } - RMContainer reservedContainer = node.getReservedContainer(); - if (reservedContainer != null) { + while (canAllocateMore(assignment, offswitchCount)) { + // Try to see if it is possible to allocate multiple container for + // the same node heartbeat + assignment = allocateContainersToNode(ps, true); - FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer( - reservedContainer.getContainerId()); - - // Try to fulfill the reservation - LOG.info("Trying to fulfill reservation for application " - + reservedApplication.getApplicationId() + " on node: " + node - .getNodeID()); - - LeafQueue queue = ((LeafQueue) reservedApplication.getQueue()); - assignment = queue.assignContainers(getClusterResource(), node, - // TODO, now we only consider limits for parent for non-labeled - // resources, should consider labeled resources as well. - new ResourceLimits(labelManager - .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, - getClusterResource())), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - if (assignment.isFulfilledReservation()) { - CSAssignment tmp = new CSAssignment( - reservedContainer.getReservedResource(), assignment.getType()); - Resources.addTo(assignment.getAssignmentInformation().getAllocated(), - reservedContainer.getReservedResource()); - tmp.getAssignmentInformation().addAllocationDetails( - reservedContainer.getContainerId(), queue.getQueuePath()); - tmp.getAssignmentInformation().incrAllocations(); - updateSchedulerHealth(lastNodeUpdateTime, node, tmp); - schedulerHealth.updateSchedulerFulfilledReservationCounts(1); - - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - queue.getParent().getQueueName(), queue.getQueueName(), - ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); - ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager, - node, reservedContainer.getContainerId(), - AllocationState.ALLOCATED_FROM_RESERVED); - } else{ - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - queue.getParent().getQueueName(), queue.getQueueName(), - ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); - ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager, - node, reservedContainer.getContainerId(), - AllocationState.SKIPPED); + if (null != assignment + && assignment.getType() == NodeType.OFF_SWITCH) { + offswitchCount++; + } } - } - - // Try to schedule more if there are no reservations to fulfill - if (node.getReservedContainer() == null) { - if (calculator.computeAvailableContainers(Resources - .add(node.getUnallocatedResource(), - node.getTotalKillableResources()), minimumAllocation) > 0) { + if (offswitchCount >= offswitchPerHeartbeatLimit) { if (LOG.isDebugEnabled()) { - LOG.debug("Trying to schedule on node: " + node.getNodeName() - + ", available: " + node.getUnallocatedResource()); + LOG.debug("Assigned maximum number of off-switch containers: " + + offswitchCount + ", assignments so far: " + assignment); } + } + } + } + } - assignment = root.assignContainers(getClusterResource(), node, - new ResourceLimits(labelManager - .getResourceByLabel(node.getPartition(), - getClusterResource())), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - if (Resources.greaterThan(calculator, getClusterResource(), - assignment.getResource(), Resources.none())) { - updateSchedulerHealth(lastNodeUpdateTime, node, assignment); - return; - } + /* + * Logics of allocate container on a single node (Old behavior) + */ + private CSAssignment allocateContainerOnSingleNode(PlacementSet ps, + FiCaSchedulerNode node, boolean withNodeHeartbeat) { + // Backward compatible way to make sure previous behavior which allocation + // driven by node heartbeat works. + if (getNode(node.getNodeID()) != node) { + LOG.error("Trying to schedule on a removed node, please double check."); + return null; + } - // Only do non-exclusive allocation when node has node-labels. - if (StringUtils.equals(node.getPartition(), - RMNodeLabelsManager.NO_LABEL)) { - return; - } + CSAssignment assignment; - // Only do non-exclusive allocation when the node-label supports that - try { - if (rmContext.getNodeLabelManager().isExclusiveNodeLabel( - node.getPartition())) { - return; - } - } catch (IOException e) { - LOG.warn( - "Exception when trying to get exclusivity of node label=" + node - .getPartition(), e); - return; - } + // Assign new containers... + // 1. Check for reserved applications + // 2. Schedule if there are no reservations + RMContainer reservedContainer = node.getReservedContainer(); + if (reservedContainer != null) { + FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer( + reservedContainer.getContainerId()); - // Try to use NON_EXCLUSIVE - assignment = root.assignContainers(getClusterResource(), node, - // TODO, now we only consider limits for parent for non-labeled - // resources, should consider labeled resources as well. - new ResourceLimits(labelManager - .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, - getClusterResource())), - SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); - updateSchedulerHealth(lastNodeUpdateTime, node, assignment); + // Try to fulfill the reservation + LOG.info( + "Trying to fulfill reservation for application " + reservedApplication + .getApplicationId() + " on node: " + node.getNodeID()); + + LeafQueue queue = ((LeafQueue) reservedApplication.getQueue()); + assignment = queue.assignContainers(getClusterResource(), ps, + // TODO, now we only consider limits for parent for non-labeled + // resources, should consider labeled resources as well. + new ResourceLimits(labelManager + .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, + getClusterResource())), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + + if (assignment.isFulfilledReservation()) { + if (withNodeHeartbeat) { + // Only update SchedulerHealth in sync scheduling, existing + // Data structure of SchedulerHealth need to be updated for + // Async mode + updateSchedulerHealth(lastNodeUpdateTime, node.getNodeID(), + assignment); } + + schedulerHealth.updateSchedulerFulfilledReservationCounts(1); + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + queue.getParent().getQueueName(), queue.getQueueName(), + ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); + ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager, + node, reservedContainer.getContainerId(), + AllocationState.ALLOCATED_FROM_RESERVED); } else{ - LOG.info("Skipping scheduling since node " + node.getNodeID() + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + queue.getParent().getQueueName(), queue.getQueueName(), + ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); + ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager, + node, reservedContainer.getContainerId(), AllocationState.SKIPPED); + } + + assignment.setSchedulingMode( + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + submitResourceCommitRequest(getClusterResource(), assignment); + } + + // Do not schedule if there are any reservations to fulfill on the node + if (node.getReservedContainer() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping scheduling since node " + node.getNodeID() + " is reserved by application " + node.getReservedContainer() .getContainerId().getApplicationAttemptId()); } - } finally { - writeLock.unlock(); + return null; + } + + // First check if we can schedule + // When this time look at one node only, try schedule if the node + // has any available or killable resource + if (calculator.computeAvailableContainers(Resources + .add(node.getUnallocatedResource(), node.getTotalKillableResources()), + minimumAllocation) <= 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("This node or this node partition doesn't have available or" + + "killable resource"); + } + return null; + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Trying to schedule on node: " + node.getNodeName() + ", available: " + + node.getUnallocatedResource()); + } + + return allocateOrReserveNewContainers(ps, withNodeHeartbeat); + } + + private CSAssignment allocateOrReserveNewContainers( + PlacementSet ps, boolean withNodeHeartbeat) { + CSAssignment assignment = root.assignContainers(getClusterResource(), ps, + new ResourceLimits(labelManager + .getResourceByLabel(ps.getPartition(), getClusterResource())), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + + assignment.setSchedulingMode(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + submitResourceCommitRequest(getClusterResource(), assignment); + + if (Resources.greaterThan(calculator, getClusterResource(), + assignment.getResource(), Resources.none())) { + if (withNodeHeartbeat) { + updateSchedulerHealth(lastNodeUpdateTime, + PlacementSetUtils.getSingleNode(ps).getNodeID(), assignment); + } + return assignment; + } + + // Only do non-exclusive allocation when node has node-labels. + if (StringUtils.equals(ps.getPartition(), RMNodeLabelsManager.NO_LABEL)) { + return null; + } + + // Only do non-exclusive allocation when the node-label supports that + try { + if (rmContext.getNodeLabelManager().isExclusiveNodeLabel( + ps.getPartition())) { + return null; + } + } catch (IOException e) { + LOG.warn("Exception when trying to get exclusivity of node label=" + ps + .getPartition(), e); + return null; + } + + // Try to use NON_EXCLUSIVE + assignment = root.assignContainers(getClusterResource(), ps, + // TODO, now we only consider limits for parent for non-labeled + // resources, should consider labeled resources as well. + new ResourceLimits(labelManager + .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, + getClusterResource())), + SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); + assignment.setSchedulingMode(SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); + submitResourceCommitRequest(getClusterResource(), assignment); + + return assignment; + } + + /* + * New behavior, allocate containers considering multiple nodes + */ + private CSAssignment allocateContainersOnMultiNodes( + PlacementSet ps) { + // When this time look at multiple nodes, try schedule if the + // partition has any available resource or killable resource + if (root.getQueueCapacities().getUsedCapacity(ps.getPartition()) >= 1.0f + && preemptionManager.getKillableResource( + CapacitySchedulerConfiguration.ROOT, ps.getPartition()) == Resources + .none()) { + if (LOG.isDebugEnabled()) { + LOG.debug("This node or this node partition doesn't have available or" + + "killable resource"); + } + return null; + } + + return allocateOrReserveNewContainers(ps, false); + } + + @VisibleForTesting + CSAssignment allocateContainersToNode(PlacementSet ps, + boolean withNodeHeartbeat) { + if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext + .isSchedulerReadyForAllocatingContainers()) { + return null; + } + + // Backward compatible way to make sure previous behavior which allocation + // driven by node heartbeat works. + FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + + // We have two different logics to handle allocation on single node / multi + // nodes. + if (null != node) { + return allocateContainerOnSingleNode(ps, node, withNodeHeartbeat); + } else { + return allocateContainersOnMultiNodes(ps); } } @@ -1356,7 +1574,7 @@ public class CapacityScheduler extends break; case NODE_RESOURCE_UPDATE: { - NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = + NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = (NodeResourceUpdateSchedulerEvent)event; updateNodeAndQueueResource(nodeResourceUpdatedEvent.getRMNode(), nodeResourceUpdatedEvent.getResourceOption()); @@ -1366,7 +1584,7 @@ public class CapacityScheduler extends { NodeLabelsUpdateSchedulerEvent labelUpdateEvent = (NodeLabelsUpdateSchedulerEvent) event; - + updateNodeLabelsAndQueueResource(labelUpdateEvent); } break; @@ -1420,7 +1638,7 @@ public class CapacityScheduler extends break; case CONTAINER_EXPIRED: { - ContainerExpiredSchedulerEvent containerExpiredEvent = + ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; ContainerId containerId = containerExpiredEvent.getContainerId(); if (containerExpiredEvent.isIncrease()) { @@ -1515,7 +1733,9 @@ public class CapacityScheduler extends + clusterResource); if (scheduleAsynchronously && getNumClusterNodes() == 1) { - asyncSchedulerThread.beginSchedule(); + for (AsyncScheduleThread t : asyncSchedulerThreads) { + t.beginSchedule(); + } } } finally { writeLock.unlock(); @@ -1561,7 +1781,9 @@ public class CapacityScheduler extends int numNodes = nodeTracker.nodeCount(); if (scheduleAsynchronously && numNodes == 0) { - asyncSchedulerThread.suspendSchedule(); + for (AsyncScheduleThread t : asyncSchedulerThreads) { + t.suspendSchedule(); + } } LOG.info( @@ -1629,7 +1851,7 @@ public class CapacityScheduler extends queue.completedContainer(getClusterResource(), application, node, rmContainer, containerStatus, event, null, true); } - + @Override protected void decreaseContainer(SchedContainerChangeRequest decreaseRequest, SchedulerApplicationAttempt attempt) { @@ -1673,7 +1895,7 @@ public class CapacityScheduler extends public List getAllNodes() { return nodeTracker.getAllNodes(); } - + @Override @Lock(Lock.NoLock.class) public void recover(RMState state) throws Exception { @@ -2085,7 +2307,7 @@ public class CapacityScheduler extends } return EnumSet.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU); } - + @Override public Resource getMaximumResourceCapability(String queueName) { CSQueue queue = getQueue(queueName); @@ -2223,4 +2445,207 @@ public class CapacityScheduler extends public ResourceUsage getClusterResourceUsage() { return root.getQueueResourceUsage(); } + + private SchedulerContainer getSchedulerContainer( + RMContainer rmContainer, boolean allocated) { + if (null == rmContainer) { + return null; + } + + FiCaSchedulerApp app = getApplicationAttempt( + rmContainer.getApplicationAttemptId()); + if (null == app) { return null; } + + NodeId nodeId; + // Get nodeId + if (rmContainer.getState() == RMContainerState.RESERVED) { + nodeId = rmContainer.getReservedNode(); + } else { + nodeId = rmContainer.getNodeId(); + } + + FiCaSchedulerNode node = getNode(nodeId); + if (null == node) { + return null; + } + return new SchedulerContainer<>(app, node, rmContainer, + // TODO, node partition should come from CSAssignment to avoid partition + // get updated before submitting the commit + node.getPartition(), allocated); + } + + private List> + getSchedulerContainersToRelease( + CSAssignment csAssignment) { + List> list = null; + + if (csAssignment.getContainersToKill() != null && !csAssignment + .getContainersToKill().isEmpty()) { + list = new ArrayList<>(); + for (RMContainer rmContainer : csAssignment.getContainersToKill()) { + list.add(getSchedulerContainer(rmContainer, false)); + } + } + + if (csAssignment.getExcessReservation() != null) { + if (null == list) { + list = new ArrayList<>(); + } + list.add( + getSchedulerContainer(csAssignment.getExcessReservation(), false)); + } + + return list; + } + + @VisibleForTesting + public void submitResourceCommitRequest(Resource cluster, + CSAssignment csAssignment) { + ResourceCommitRequest request = + createResourceCommitRequest(csAssignment); + + if (null == request) { + return; + } + + if (scheduleAsynchronously) { + // Submit to a commit thread and commit it async-ly + resourceCommitterService.addNewCommitRequest(request); + } else{ + // Otherwise do it sync-ly. + tryCommit(cluster, request); + } + } + + @VisibleForTesting + public ResourceCommitRequest + createResourceCommitRequest(CSAssignment csAssignment) { + ContainerAllocationProposal allocated = + null; + ContainerAllocationProposal reserved = + null; + List> released = + null; + + if (Resources.greaterThan(calculator, getClusterResource(), + csAssignment.getResource(), Resources.none())) { + // Allocated something + List allocations = + csAssignment.getAssignmentInformation().getAllocationDetails(); + if (!allocations.isEmpty()) { + RMContainer rmContainer = allocations.get(0).rmContainer; + allocated = new ContainerAllocationProposal<>( + getSchedulerContainer(rmContainer, true), + getSchedulerContainersToRelease(csAssignment), + getSchedulerContainer(csAssignment.getFulfilledReservedContainer(), + false), csAssignment.isIncreasedAllocation(), + csAssignment.getType(), csAssignment.getRequestLocalityType(), + csAssignment.getSchedulingMode() != null ? + csAssignment.getSchedulingMode() : + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, + csAssignment.getResource()); + } + + // Reserved something + List reservation = + csAssignment.getAssignmentInformation().getReservationDetails(); + if (!reservation.isEmpty()) { + RMContainer rmContainer = reservation.get(0).rmContainer; + reserved = new ContainerAllocationProposal<>( + getSchedulerContainer(rmContainer, false), + getSchedulerContainersToRelease(csAssignment), + getSchedulerContainer(csAssignment.getFulfilledReservedContainer(), + false), csAssignment.isIncreasedAllocation(), + csAssignment.getType(), csAssignment.getRequestLocalityType(), + csAssignment.getSchedulingMode() != null ? + csAssignment.getSchedulingMode() : + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, + csAssignment.getResource()); + } + } + + // When we don't need to allocate/reserve anything, we can feel free to + // kill all to-release containers in the request. + if (null == allocated && null == reserved) { + released = getSchedulerContainersToRelease(csAssignment); + } + + if (null != allocated || null != reserved || (null != released && !released + .isEmpty())) { + List> + allocationsList = null; + if (allocated != null) { + allocationsList = new ArrayList<>(); + allocationsList.add(allocated); + } + + List> + reservationsList = null; + if (reserved != null) { + reservationsList = new ArrayList<>(); + reservationsList.add(reserved); + } + + return new ResourceCommitRequest<>(allocationsList, reservationsList, + released); + } + + return null; + } + + @Override + public void tryCommit(Resource cluster, ResourceCommitRequest r) { + ResourceCommitRequest request = + (ResourceCommitRequest) r; + + ApplicationAttemptId attemptId = null; + + // We need to update unconfirmed allocated resource of application when + // any container allocated. + boolean updateUnconfirmedAllocatedResource = + request.getContainersToAllocate() != null && !request + .getContainersToAllocate().isEmpty(); + + // find the application to accept and apply the ResourceCommitRequest + if (request.anythingAllocatedOrReserved()) { + ContainerAllocationProposal c = + request.getFirstAllocatedOrReservedContainer(); + attemptId = + c.getAllocatedOrReservedContainer().getSchedulerApplicationAttempt() + .getApplicationAttemptId(); + } else { + if (!request.getContainersToRelease().isEmpty()) { + attemptId = request.getContainersToRelease().get(0) + .getSchedulerApplicationAttempt().getApplicationAttemptId(); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Try to commit allocation proposal=" + request); + } + + if (attemptId != null) { + FiCaSchedulerApp app = getApplicationAttempt(attemptId); + if (app != null) { + if (app.accept(cluster, request)) { + app.apply(cluster, request); + LOG.info("Allocation proposal accepted"); + } else{ + LOG.info("Failed to accept allocation proposal"); + } + + // Update unconfirmed allocated resource. + if (updateUnconfirmedAllocatedResource) { + app.decUnconfirmedRes(request.getTotalAllocatedResource()); + } + } + } + } + + public int getAsyncSchedulingPendingBacklogs() { + if (scheduleAsynchronously) { + return resourceCommitterService.getPendingBacklogs(); + } + return 0; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index cea5aa4..c153c26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -209,6 +209,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".enable"; @Private + public static final String SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD = + SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".maximum-threads"; + + @Private public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false; @Private http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 8941fdf..161957f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -67,8 +67,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Activi import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.utils.Lock; @@ -914,54 +919,6 @@ public class LeafQueue extends AbstractCSQueue { ApplicationAttemptId applicationAttemptId) { return applicationAttemptMap.get(applicationAttemptId); } - - private void handleExcessReservedContainer(Resource clusterResource, - CSAssignment assignment, FiCaSchedulerNode node, FiCaSchedulerApp app) { - if (assignment.getExcessReservation() != null) { - RMContainer excessReservedContainer = assignment.getExcessReservation(); - - if (excessReservedContainer.hasIncreaseReservation()) { - unreserveIncreasedContainer(clusterResource, - app, node, excessReservedContainer); - } else { - completedContainer(clusterResource, assignment.getApplication(), - scheduler.getNode(excessReservedContainer.getAllocatedNode()), - excessReservedContainer, - SchedulerUtils.createAbnormalContainerStatus( - excessReservedContainer.getContainerId(), - SchedulerUtils.UNRESERVED_CONTAINER), - RMContainerEventType.RELEASED, null, false); - } - - assignment.setExcessReservation(null); - } - } - - private void killToPreemptContainers(Resource clusterResource, - FiCaSchedulerNode node, - CSAssignment assignment) { - if (assignment.getContainersToKill() != null) { - StringBuilder sb = new StringBuilder("Killing containers: ["); - - for (RMContainer c : assignment.getContainersToKill()) { - FiCaSchedulerApp application = csContext.getApplicationAttempt( - c.getApplicationAttemptId()); - LeafQueue q = application.getCSLeafQueue(); - q.completedContainer(clusterResource, application, node, c, SchedulerUtils - .createPreemptedContainerStatus(c.getContainerId(), - SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL, - null, false); - sb.append("(container=" + c.getContainerId() + " resource=" + c - .getAllocatedResource() + ")"); - } - - sb.append("] for container=" + assignment.getAssignmentInformation() - .getFirstAllocatedOrReservedContainerId() + " resource=" + assignment - .getResource()); - LOG.info(sb.toString()); - - } - } private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) { // Set preemption-allowed: @@ -971,174 +928,312 @@ public class LeafQueue extends AbstractCSQueue { limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity); } + private CSAssignment allocateFromReservedContainer( + Resource clusterResource, PlacementSet ps, + ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { + FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + if (null == node) { + return null; + } + + RMContainer reservedContainer = node.getReservedContainer(); + if (reservedContainer != null) { + FiCaSchedulerApp application = getApplication( + reservedContainer.getApplicationAttemptId()); + + if (null != application) { + ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, + node.getNodeID(), SystemClock.getInstance().getTime(), application); + CSAssignment assignment = application.assignContainers(clusterResource, + ps, currentResourceLimits, schedulingMode, reservedContainer); + return assignment; + } + } + + return null; + } + @Override public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, ResourceLimits currentResourceLimits, - SchedulingMode schedulingMode) { - try { - writeLock.lock(); - updateCurrentResourceLimits(currentResourceLimits, clusterResource); + PlacementSet ps, ResourceLimits currentResourceLimits, + SchedulingMode schedulingMode) { + updateCurrentResourceLimits(currentResourceLimits, clusterResource); + FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); - if (LOG.isDebugEnabled()) { - LOG.debug( - "assignContainers: node=" + node.getNodeName() + " #applications=" - + orderingPolicy.getNumSchedulableEntities()); - } + if (LOG.isDebugEnabled()) { + LOG.debug("assignContainers: partition=" + ps.getPartition() + + " #applications=" + orderingPolicy.getNumSchedulableEntities()); + } - setPreemptionAllowed(currentResourceLimits, node.getPartition()); + setPreemptionAllowed(currentResourceLimits, ps.getPartition()); - // Check for reserved resources - RMContainer reservedContainer = node.getReservedContainer(); - if (reservedContainer != null) { - FiCaSchedulerApp application = getApplication( - reservedContainer.getApplicationAttemptId()); + // Check for reserved resources, try to allocate reserved container first. + CSAssignment assignment = allocateFromReservedContainer(clusterResource, + ps, currentResourceLimits, schedulingMode); + if (null != assignment) { + return assignment; + } - ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, - node.getNodeID(), SystemClock.getInstance().getTime(), application); + // if our queue cannot access this node, just return + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY + && !accessibleToPartition(ps.getPartition())) { + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.REJECTED, + ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + ps + .getPartition()); + return CSAssignment.NULL_ASSIGNMENT; + } - CSAssignment assignment = application.assignContainers(clusterResource, - node, currentResourceLimits, schedulingMode, reservedContainer); - handleExcessReservedContainer(clusterResource, assignment, node, - application); - killToPreemptContainers(clusterResource, node, assignment); - return assignment; + // Check if this queue need more resource, simply skip allocation if this + // queue doesn't need more resources. + if (!hasPendingResourceRequest(ps.getPartition(), clusterResource, + schedulingMode)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip this queue=" + getQueuePath() + + ", because it doesn't need more resource, schedulingMode=" + + schedulingMode.name() + " node-partition=" + ps.getPartition()); } + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE); + return CSAssignment.NULL_ASSIGNMENT; + } - // if our queue cannot access this node, just return - if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY - && !accessibleToPartition(node.getPartition())) { + for (Iterator assignmentIterator = + orderingPolicy.getAssignmentIterator(); + assignmentIterator.hasNext(); ) { + FiCaSchedulerApp application = assignmentIterator.next(); + + ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, + node.getNodeID(), SystemClock.getInstance().getTime(), application); + + // Check queue max-capacity limit + if (!super.canAssignToThisQueue(clusterResource, ps.getPartition(), + currentResourceLimits, application.getCurrentReservation(), + schedulingMode)) { + ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( + activitiesManager, node, application, application.getPriority(), + ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParent().getQueueName(), getQueueName(), ActivityState.REJECTED, - ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node - .getPartition()); + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); return CSAssignment.NULL_ASSIGNMENT; } - // Check if this queue need more resource, simply skip allocation if this - // queue doesn't need more resources. - if (!hasPendingResourceRequest(node.getPartition(), clusterResource, - schedulingMode)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip this queue=" + getQueuePath() - + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-partition=" + node - .getPartition()); - } + Resource userLimit = computeUserLimitAndSetHeadroom(application, + clusterResource, ps.getPartition(), schedulingMode); + + // Check user limit + if (!canAssignToUser(clusterResource, application.getUser(), userLimit, + application, ps.getPartition(), currentResourceLimits)) { + application.updateAMContainerDiagnostics(AMState.ACTIVATED, + "User capacity has reached its maximum limit."); + ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( + activitiesManager, node, application, application.getPriority(), + ActivityDiagnosticConstant.USER_CAPACITY_MAXIMUM_LIMIT); + continue; + } + + // Try to schedule + assignment = application.assignContainers(clusterResource, + ps, currentResourceLimits, schedulingMode, null); + + if (LOG.isDebugEnabled()) { + LOG.debug("post-assignContainers for application " + application + .getApplicationId()); + application.showRequests(); + } + + // Did we schedule or reserve a container? + Resource assigned = assignment.getResource(); + + if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, + Resources.none())) { + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), + ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); + return assignment; + } else if (assignment.getSkippedType() + == CSAssignment.SkippedType.OTHER) { + ActivitiesLogger.APP.finishSkippedAppAllocationRecording( + activitiesManager, application.getApplicationId(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); + application.updateNodeInfoForAMDiagnostics(node); + } else if (assignment.getSkippedType() + == CSAssignment.SkippedType.QUEUE_LIMIT) { + return assignment; + } else{ + // If we don't allocate anything, and it is not skipped by application, + // we will return to respect FIFO of applications ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE); + ActivityDiagnosticConstant.RESPECT_FIFO); + ActivitiesLogger.APP.finishSkippedAppAllocationRecording( + activitiesManager, application.getApplicationId(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); return CSAssignment.NULL_ASSIGNMENT; } + } + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); - for (Iterator assignmentIterator = - orderingPolicy.getAssignmentIterator(); - assignmentIterator.hasNext(); ) { - FiCaSchedulerApp application = assignmentIterator.next(); - - ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, - node.getNodeID(), SystemClock.getInstance().getTime(), application); + return CSAssignment.NULL_ASSIGNMENT; + } - // Check queue max-capacity limit - if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), - currentResourceLimits, application.getCurrentReservation(), - schedulingMode)) { - ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( - activitiesManager, node, application, application.getPriority(), - ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.EMPTY); - return CSAssignment.NULL_ASSIGNMENT; + @Override + public boolean accept(Resource cluster, + ResourceCommitRequest request) { + ContainerAllocationProposal allocation = + request.getFirstAllocatedOrReservedContainer(); + SchedulerContainer schedulerContainer = + allocation.getAllocatedOrReservedContainer(); + + // Do not check limits when allocation from a reserved container + if (allocation.getAllocateFromReservedContainer() == null) { + try { + readLock.lock(); + FiCaSchedulerApp app = + schedulerContainer.getSchedulerApplicationAttempt(); + String username = app.getUser(); + String p = schedulerContainer.getNodePartition(); + + // check user-limit + Resource userLimit = computeUserLimitAndSetHeadroom(app, cluster, p, + allocation.getSchedulingMode()); + + // Deduct resources that we can release + Resource usedResource = Resources.clone(getUser(username).getUsed(p)); + Resources.subtractFrom(usedResource, + request.getTotalReleasedResource()); + + if (Resources.greaterThan(resourceCalculator, cluster, usedResource, + userLimit)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Used resource=" + usedResource + " exceeded user-limit=" + + userLimit); + } + return false; } + } finally { + readLock.unlock(); + } + } - Resource userLimit = computeUserLimitAndSetHeadroom(application, - clusterResource, node.getPartition(), schedulingMode); - - // Check user limit - if (!canAssignToUser(clusterResource, application.getUser(), userLimit, - application, node.getPartition(), currentResourceLimits)) { - application.updateAMContainerDiagnostics(AMState.ACTIVATED, - "User capacity has reached its maximum limit."); - ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( - activitiesManager, node, application, application.getPriority(), - ActivityDiagnosticConstant.USER_CAPACITY_MAXIMUM_LIMIT); - continue; - } + return super.accept(cluster, request); + } - // Try to schedule - CSAssignment assignment = application.assignContainers(clusterResource, - node, currentResourceLimits, schedulingMode, null); + private void internalReleaseContainer(Resource clusterResource, + SchedulerContainer schedulerContainer) { + RMContainer rmContainer = schedulerContainer.getRmContainer(); - if (LOG.isDebugEnabled()) { - LOG.debug("post-assignContainers for application " + application - .getApplicationId()); - application.showRequests(); + LeafQueue targetLeafQueue = + schedulerContainer.getSchedulerApplicationAttempt().getCSLeafQueue(); + + if (targetLeafQueue == this) { + // When trying to preempt containers from the same queue + if (rmContainer.hasIncreaseReservation()) { + // Increased container reservation + unreserveIncreasedContainer(clusterResource, + schedulerContainer.getSchedulerApplicationAttempt(), + schedulerContainer.getSchedulerNode(), rmContainer); + } else if (rmContainer.getState() == RMContainerState.RESERVED) { + // For other reserved containers + // This is a reservation exchange, complete previous reserved container + completedContainer(clusterResource, + schedulerContainer.getSchedulerApplicationAttempt(), + schedulerContainer.getSchedulerNode(), rmContainer, SchedulerUtils + .createAbnormalContainerStatus(rmContainer.getContainerId(), + SchedulerUtils.UNRESERVED_CONTAINER), + RMContainerEventType.RELEASED, null, false); + } + } else{ + // When trying to preempt containers from different queue -- this + // is for lazy preemption feature (kill preemption candidate in scheduling + // cycle). + targetLeafQueue.completedContainer(clusterResource, + schedulerContainer.getSchedulerApplicationAttempt(), + schedulerContainer.getSchedulerNode(), + schedulerContainer.getRmContainer(), SchedulerUtils + .createPreemptedContainerStatus(rmContainer.getContainerId(), + SchedulerUtils.PREEMPTED_CONTAINER), + RMContainerEventType.KILL, null, false); + } + } + + private void releaseContainers(Resource clusterResource, + ResourceCommitRequest request) { + for (SchedulerContainer c : request + .getContainersToRelease()) { + internalReleaseContainer(clusterResource, c); + } + + // Handle container reservation looking, or lazy preemption case: + if (null != request.getContainersToAllocate() && !request + .getContainersToAllocate().isEmpty()) { + for (ContainerAllocationProposal context : request + .getContainersToAllocate()) { + if (null != context.getToRelease()) { + for (SchedulerContainer c : context + .getToRelease()) { + internalReleaseContainer(clusterResource, c); + } } + } + } + } - // Did we schedule or reserve a container? - Resource assigned = assignment.getResource(); + public void apply(Resource cluster, + ResourceCommitRequest request) { + // Do we need to call parent queue's apply? + boolean applyToParentQueue = false; - handleExcessReservedContainer(clusterResource, assignment, node, - application); - killToPreemptContainers(clusterResource, node, assignment); + releaseContainers(cluster, request); - if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, - Resources.none())) { - // Get reserved or allocated container from application - RMContainer reservedOrAllocatedRMContainer = - application.getRMContainer(assignment.getAssignmentInformation() - .getFirstAllocatedOrReservedContainerId()); + try { + writeLock.lock(); + if (request.anythingAllocatedOrReserved()) { + ContainerAllocationProposal + allocation = request.getFirstAllocatedOrReservedContainer(); + SchedulerContainer + schedulerContainer = allocation.getAllocatedOrReservedContainer(); + + // Do not modify queue when allocation from reserved container + if (allocation.getAllocateFromReservedContainer() == null) { + // Only invoke apply() of ParentQueue when new allocation / + // reservation happen. + applyToParentQueue = true; // Book-keeping // Note: Update headroom to account for current allocation too... - allocateResource(clusterResource, application, assigned, - node.getPartition(), reservedOrAllocatedRMContainer, - assignment.isIncreasedAllocation()); - - // Update reserved metrics - Resource reservedRes = - assignment.getAssignmentInformation().getReserved(); - if (reservedRes != null && !reservedRes.equals(Resources.none())) { - incReservedResource(node.getPartition(), reservedRes); - } + allocateResource(cluster, + schedulerContainer.getSchedulerApplicationAttempt(), + allocation.getAllocatedOrReservedResource(), + schedulerContainer.getNodePartition(), + schedulerContainer.getRmContainer(), + allocation.isIncreasedAllocation()); + orderingPolicy.containerAllocated( + schedulerContainer.getSchedulerApplicationAttempt(), + schedulerContainer.getRmContainer()); + } - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParent().getQueueName(), getQueueName(), - ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); - - // Done - return assignment; - } else if (assignment.getSkippedType() - == CSAssignment.SkippedType.OTHER) { - ActivitiesLogger.APP.finishSkippedAppAllocationRecording( - activitiesManager, application.getApplicationId(), - ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); - application.updateNodeInfoForAMDiagnostics(node); - } else if (assignment.getSkippedType() - == CSAssignment.SkippedType.QUEUE_LIMIT) { - return assignment; - } else{ - // If we don't allocate anything, and it is not skipped by application, - // we will return to respect FIFO of applications - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.RESPECT_FIFO); - ActivitiesLogger.APP.finishSkippedAppAllocationRecording( - activitiesManager, application.getApplicationId(), - ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); - return CSAssignment.NULL_ASSIGNMENT; + // Update reserved resource + if (Resources.greaterThan(resourceCalculator, cluster, + request.getTotalReservedResource(), Resources.none())) { + incReservedResource(schedulerContainer.getNodePartition(), + request.getTotalReservedResource()); } } - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.EMPTY); - - return CSAssignment.NULL_ASSIGNMENT; } finally { writeLock.unlock(); } + + if (parent != null && applyToParentQueue) { + parent.apply(cluster, request); + } } + protected Resource getHeadroom(User user, Resource queueCurrentLimit, Resource clusterResource, FiCaSchedulerApp application) { return getHeadroom(user, queueCurrentLimit, clusterResource, application, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org