Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 774A9200C2C for ; Fri, 3 Mar 2017 23:05:07 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 75F3B160B85; Fri, 3 Mar 2017 22:05:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 39EA7160B6D for ; Fri, 3 Mar 2017 23:05:05 +0100 (CET) Received: (qmail 8782 invoked by uid 500); 3 Mar 2017 22:04:54 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 7311 invoked by uid 99); 3 Mar 2017 22:04:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Mar 2017 22:04:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C8FECF320E; Fri, 3 Mar 2017 22:04:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aengineer@apache.org To: common-commits@hadoop.apache.org Date: Fri, 03 Mar 2017 22:05:18 -0000 Message-Id: In-Reply-To: <275bda3b88a54b959537eb82f6111bf2@git.apache.org> References: <275bda3b88a54b959537eb82f6111bf2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [26/50] [abbrv] hadoop git commit: YARN-6216. Unify Container Resizing code paths with Container Updates making it scheduler agnostic. (Arun Suresh via wangda) archived-at: Fri, 03 Mar 2017 22:05:07 -0000 YARN-6216. Unify Container Resizing code paths with Container Updates making it scheduler agnostic. (Arun Suresh via wangda) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/eac6b4c3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/eac6b4c3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/eac6b4c3 Branch: refs/heads/HDFS-7240 Commit: eac6b4c35c50e555c2f1b5f913bb2c4d839f1ff4 Parents: 480b4dd Author: Wangda Tan Authored: Tue Feb 28 10:35:50 2017 -0800 Committer: Wangda Tan Committed: Tue Feb 28 10:35:50 2017 -0800 ---------------------------------------------------------------------- .../sls/scheduler/ResourceSchedulerWrapper.java | 8 - .../server/scheduler/SchedulerRequestKey.java | 12 +- .../server/resourcemanager/RMServerUtils.java | 27 +- .../rmcontainer/RMContainer.java | 4 - .../RMContainerChangeResourceEvent.java | 44 --- .../rmcontainer/RMContainerImpl.java | 46 --- .../scheduler/AbstractYarnScheduler.java | 171 +++++++--- .../scheduler/AppSchedulingInfo.java | 283 +--------------- .../scheduler/ContainerUpdateContext.java | 193 ++++++++--- .../scheduler/SchedulerApplicationAttempt.java | 212 ++++-------- .../scheduler/SchedulerNode.java | 44 --- .../scheduler/capacity/AbstractCSQueue.java | 13 +- .../scheduler/capacity/CSQueue.java | 15 - .../scheduler/capacity/CapacityScheduler.java | 121 +------ .../scheduler/capacity/LeafQueue.java | 152 +-------- .../scheduler/capacity/ParentQueue.java | 53 +-- .../capacity/allocator/ContainerAllocator.java | 31 +- .../allocator/IncreaseContainerAllocator.java | 337 ------------------- .../common/ContainerAllocationProposal.java | 9 +- .../scheduler/common/fica/FiCaSchedulerApp.java | 245 +++----------- .../common/fica/FiCaSchedulerNode.java | 14 - .../scheduler/fair/FairScheduler.java | 11 +- .../scheduler/fifo/FifoScheduler.java | 8 - .../scheduler/capacity/TestChildQueueOrder.java | 4 +- .../capacity/TestContainerResizing.java | 134 +------- .../capacity/TestIncreaseAllocationExpirer.java | 12 +- .../scheduler/capacity/TestLeafQueue.java | 4 +- .../scheduler/capacity/TestParentQueue.java | 4 +- 28 files changed, 482 insertions(+), 1729 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 5517362..df8323a 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -969,12 +969,4 @@ final public class ResourceSchedulerWrapper return Priority.newInstance(0); } - @Override - protected void decreaseContainer( - SchedContainerChangeRequest decreaseRequest, - SchedulerApplicationAttempt attempt) { - // TODO Auto-generated method stub - - } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java index 02539ba..c4f37f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java @@ -116,7 +116,17 @@ public final class SchedulerRequestKey implements if (priorityCompare != 0) { return priorityCompare; } - return Long.compare(allocationRequestId, o.getAllocationRequestId()); + int allocReqCompare = Long.compare( + allocationRequestId, o.getAllocationRequestId()); + + if (allocReqCompare != 0) { + return allocReqCompare; + } + + if (this.containerToUpdate != null && o.containerToUpdate != null) { + return (this.containerToUpdate.compareTo(o.containerToUpdate)); + } + return 0; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index e98141b..0aa7a2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -152,26 +152,16 @@ public class RMServerUtils { if (msg == null) { if ((updateType != ContainerUpdateType.PROMOTE_EXECUTION_TYPE) && (updateType !=ContainerUpdateType.DEMOTE_EXECUTION_TYPE)) { - Resource original = rmContainer.getContainer().getResource(); - Resource target = updateReq.getCapability(); - if (Resources.fitsIn(target, original)) { - // This is a decrease request - if (validateIncreaseDecreaseRequest(rmContext, updateReq, - maximumAllocation, false)) { - updateRequests.getDecreaseRequests().add(updateReq); - outstandingUpdate.add(updateReq.getContainerId()); - } else { - msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; - } - } else { - // This is an increase request - if (validateIncreaseDecreaseRequest(rmContext, updateReq, - maximumAllocation, true)) { + if (validateIncreaseDecreaseRequest( + rmContext, updateReq, maximumAllocation)) { + if (ContainerUpdateType.INCREASE_RESOURCE == updateType) { updateRequests.getIncreaseRequests().add(updateReq); - outstandingUpdate.add(updateReq.getContainerId()); } else { - msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; + updateRequests.getDecreaseRequests().add(updateReq); } + outstandingUpdate.add(updateReq.getContainerId()); + } else { + msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; } } else { ExecutionType original = rmContainer.getExecutionType(); @@ -329,8 +319,7 @@ public class RMServerUtils { // Sanity check and normalize target resource private static boolean validateIncreaseDecreaseRequest(RMContext rmContext, - UpdateContainerRequest request, Resource maximumAllocation, - boolean increase) { + UpdateContainerRequest request, Resource maximumAllocation) { if (request.getCapability().getMemorySize() < 0 || request.getCapability().getMemorySize() > maximumAllocation .getMemorySize()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index 020764b..7ad381e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -91,10 +91,6 @@ public interface RMContainer extends EventHandler { String getNodeHttpAddress(); String getNodeLabelExpression(); - - boolean hasIncreaseReservation(); - - void cancelIncreaseReservation(); String getQueueName(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java deleted file mode 100644 index 920cfdb..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; - -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Resource; - -public class RMContainerChangeResourceEvent extends RMContainerEvent { - - final Resource targetResource; - final boolean increase; - - public RMContainerChangeResourceEvent(ContainerId containerId, - Resource targetResource, boolean increase) { - super(containerId, RMContainerEventType.CHANGE_RESOURCE); - - this.targetResource = targetResource; - this.increase = increase; - } - - public Resource getTargetResource() { - return targetResource; - } - - public boolean isIncrease() { - return increase; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 72ce1a0..12fbbea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -131,8 +131,6 @@ public class RMContainerImpl implements RMContainer, Comparable { .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, RMContainerEventType.RESERVED, new ContainerReservedTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, - RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition()) - .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, RMContainerEventType.ACQUIRE_UPDATED_CONTAINER, new ContainerAcquiredWhileRunningTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, @@ -183,7 +181,6 @@ public class RMContainerImpl implements RMContainer, Comparable { private boolean isAMContainer; private List resourceRequests; - private volatile boolean hasIncreaseReservation = false; // Only used for container resource increase and decrease. This is the // resource to rollback to should container resource increase token expires. private Resource lastConfirmedResource; @@ -561,12 +558,6 @@ public class RMContainerImpl implements RMContainer, Comparable { if (c != null) { c.setNodeId(container.reservedNode); } - - if (!EnumSet.of(RMContainerState.NEW, RMContainerState.RESERVED) - .contains(container.getState())) { - // When container's state != NEW/RESERVED, it is an increase reservation - container.hasIncreaseReservation = true; - } } } @@ -681,33 +672,6 @@ public class RMContainerImpl implements RMContainer, Comparable { } } } - - private static final class ChangeResourceTransition extends BaseTransition { - - @Override - public void transition(RMContainerImpl container, RMContainerEvent event) { - RMContainerChangeResourceEvent changeEvent = (RMContainerChangeResourceEvent)event; - - Resource targetResource = changeEvent.getTargetResource(); - Resource lastConfirmedResource = container.lastConfirmedResource; - - if (!changeEvent.isIncrease()) { - // Only unregister from the containerAllocationExpirer when target - // resource is less than or equal to the last confirmed resource. - if (Resources.fitsIn(targetResource, lastConfirmedResource)) { - container.lastConfirmedResource = targetResource; - container.containerAllocationExpirer.unregister( - new AllocationExpirationInfo(event.getContainerId())); - } - } - - container.container.setResource(targetResource); - - // We reach here means we either allocated increase reservation OR - // decreased container, reservation will be cancelled anyway. - container.hasIncreaseReservation = false; - } - } private static class FinishedTransition extends BaseTransition { @@ -857,16 +821,6 @@ public class RMContainerImpl implements RMContainer, Comparable { return -1; } - @Override - public boolean hasIncreaseReservation() { - return hasIncreaseReservation; - } - - @Override - public void cancelIncreaseReservation() { - hasIncreaseReservation = false; - } - public void setQueueName(String queueName) { this.queueName = queueName; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index ce6d2a2..213839d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; import java.util.List; @@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -85,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Activi import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; + import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -597,6 +600,8 @@ public abstract class AbstractYarnScheduler if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) { completedContainerInternal(rmContainer, containerStatus, event); + completeOustandingUpdatesWhichAreReserved( + rmContainer, containerStatus, event); } else { ContainerId containerId = rmContainer.getContainerId(); // Inform the container @@ -622,6 +627,33 @@ public abstract class AbstractYarnScheduler recoverResourceRequestForContainer(rmContainer); } + // Optimization: + // Check if there are in-flight container updates and complete the + // associated temp containers. These are removed when the app completes, + // but removing them when the actual container completes would allow the + // scheduler to reallocate those resources sooner. + private void completeOustandingUpdatesWhichAreReserved( + RMContainer rmContainer, ContainerStatus containerStatus, + RMContainerEventType event) { + N schedulerNode = getSchedulerNode(rmContainer.getNodeId()); + if (schedulerNode != null && + schedulerNode.getReservedContainer() != null) { + RMContainer resContainer = schedulerNode.getReservedContainer(); + if (resContainer.getReservedSchedulerKey() != null) { + ContainerId containerToUpdate = resContainer + .getReservedSchedulerKey().getContainerToUpdate(); + if (containerToUpdate != null && + containerToUpdate.equals(containerStatus.getContainerId())) { + completedContainerInternal(resContainer, + ContainerStatus.newInstance(resContainer.getContainerId(), + containerStatus.getState(), containerStatus + .getDiagnostics(), + containerStatus.getExitStatus()), event); + } + } + } + } + // clean up a completed container protected abstract void completedContainerInternal(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event); @@ -650,28 +682,6 @@ public abstract class AbstractYarnScheduler } } - protected void decreaseContainers( - List decreaseRequests, - SchedulerApplicationAttempt attempt) { - if (null == decreaseRequests || decreaseRequests.isEmpty()) { - return; - } - // Pre-process decrease requests - List schedDecreaseRequests = - createSchedContainerChangeRequests(decreaseRequests, false); - for (SchedContainerChangeRequest request : schedDecreaseRequests) { - if (LOG.isDebugEnabled()) { - LOG.debug("Processing decrease request:" + request); - } - // handle decrease request - decreaseContainer(request, attempt); - } - } - - protected abstract void decreaseContainer( - SchedContainerChangeRequest decreaseRequest, - SchedulerApplicationAttempt attempt); - @Override public N getSchedulerNode(NodeId nodeId) { return nodeTracker.getNode(nodeId); @@ -1074,21 +1084,39 @@ public abstract class AbstractYarnScheduler } } - protected void handleExecutionTypeUpdates( - SchedulerApplicationAttempt appAttempt, - List promotionRequests, - List demotionRequests) { + protected void handleContainerUpdates( + SchedulerApplicationAttempt appAttempt, ContainerUpdates updates) { + List promotionRequests = + updates.getPromotionRequests(); if (promotionRequests != null && !promotionRequests.isEmpty()) { LOG.info("Promotion Update requests : " + promotionRequests); - handlePromotionRequests(appAttempt, promotionRequests); + // Promotion is technically an increase request from + // 0 resources to target resources. + handleIncreaseRequests(appAttempt, promotionRequests); } + List increaseRequests = + updates.getIncreaseRequests(); + if (increaseRequests != null && !increaseRequests.isEmpty()) { + LOG.info("Resource increase requests : " + increaseRequests); + handleIncreaseRequests(appAttempt, increaseRequests); + } + List demotionRequests = + updates.getDemotionRequests(); if (demotionRequests != null && !demotionRequests.isEmpty()) { LOG.info("Demotion Update requests : " + demotionRequests); - handleDemotionRequests(appAttempt, demotionRequests); + // Demotion is technically a decrease request from initial + // to 0 resources + handleDecreaseRequests(appAttempt, demotionRequests); + } + List decreaseRequests = + updates.getDecreaseRequests(); + if (decreaseRequests != null && !decreaseRequests.isEmpty()) { + LOG.info("Resource decrease requests : " + decreaseRequests); + handleDecreaseRequests(appAttempt, decreaseRequests); } } - private void handlePromotionRequests( + private void handleIncreaseRequests( SchedulerApplicationAttempt applicationAttempt, List updateContainerRequests) { for (UpdateContainerRequest uReq : updateContainerRequests) { @@ -1118,7 +1146,7 @@ public abstract class AbstractYarnScheduler } } - private void handleDemotionRequests(SchedulerApplicationAttempt appAttempt, + private void handleDecreaseRequests(SchedulerApplicationAttempt appAttempt, List demotionRequests) { OpportunisticContainerContext oppCntxt = appAttempt.getOpportunisticContainerContext(); @@ -1126,24 +1154,59 @@ public abstract class AbstractYarnScheduler RMContainer rmContainer = rmContext.getScheduler().getRMContainer(uReq.getContainerId()); if (rmContainer != null) { - if (appAttempt.getUpdateContext().checkAndAddToOutstandingDecreases( - rmContainer.getContainer())) { - RMContainer demotedRMContainer = - createDemotedRMContainer(appAttempt, oppCntxt, rmContainer); - appAttempt.addToNewlyDemotedContainers( - uReq.getContainerId(), demotedRMContainer); + SchedulerNode schedulerNode = rmContext.getScheduler() + .getSchedulerNode(rmContainer.getContainer().getNodeId()); + if (appAttempt.getUpdateContext() + .checkAndAddToOutstandingDecreases(uReq, schedulerNode, + rmContainer.getContainer())) { + if (ContainerUpdateType.DEMOTE_EXECUTION_TYPE == + uReq.getContainerUpdateType()) { + RMContainer demotedRMContainer = + createDemotedRMContainer(appAttempt, oppCntxt, rmContainer); + appAttempt.addToNewlyDemotedContainers( + uReq.getContainerId(), demotedRMContainer); + } else { + RMContainer demotedRMContainer = createDecreasedRMContainer( + appAttempt, uReq, rmContainer); + appAttempt.addToNewlyDecreasedContainers( + uReq.getContainerId(), demotedRMContainer); + } } else { appAttempt.addToUpdateContainerErrors( UpdateContainerError.newInstance( RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq)); } } else { - LOG.warn("Cannot demote non-existent (or completed) Container [" - + uReq.getContainerId() + "]"); + LOG.warn("Cannot demote/decrease non-existent (or completed) " + + "Container [" + uReq.getContainerId() + "]"); } } } + private RMContainer createDecreasedRMContainer( + SchedulerApplicationAttempt appAttempt, UpdateContainerRequest uReq, + RMContainer rmContainer) { + SchedulerRequestKey sk = + SchedulerRequestKey.extractFrom(rmContainer.getContainer()); + Container decreasedContainer = BuilderUtils.newContainer( + ContainerId.newContainerId(appAttempt.getApplicationAttemptId(), + appAttempt.getNewContainerId()), + rmContainer.getContainer().getNodeId(), + rmContainer.getContainer().getNodeHttpAddress(), + Resources.none(), + sk.getPriority(), null, rmContainer.getExecutionType(), + sk.getAllocationRequestId()); + decreasedContainer.setVersion(rmContainer.getContainer().getVersion()); + RMContainer newRmContainer = new RMContainerImpl(decreasedContainer, + sk, appAttempt.getApplicationAttemptId(), + decreasedContainer.getNodeId(), appAttempt.getUser(), rmContext, + rmContainer.isRemotelyAllocated()); + appAttempt.addRMContainer(decreasedContainer.getId(), rmContainer); + ((AbstractYarnScheduler) rmContext.getScheduler()).getNode( + decreasedContainer.getNodeId()).allocateContainer(newRmContainer); + return newRmContainer; + } + private RMContainer createDemotedRMContainer( SchedulerApplicationAttempt appAttempt, OpportunisticContainerContext oppCntxt, @@ -1162,4 +1225,36 @@ public abstract class AbstractYarnScheduler return SchedulerUtils.createOpportunisticRmContainer( rmContext, demotedContainer, false); } + + /** + * Rollback container update after expiry. + * @param containerId ContainerId. + */ + protected void rollbackContainerUpdate( + ContainerId containerId) { + RMContainer rmContainer = getRMContainer(containerId); + if (rmContainer == null) { + LOG.info("Cannot rollback resource for container " + containerId + + ". The container does not exist."); + return; + } + T app = getCurrentAttemptForContainer(containerId); + if (getCurrentAttemptForContainer(containerId) == null) { + LOG.info("Cannot rollback resource for container " + containerId + + ". The application that the container " + + "belongs to does not exist."); + return; + } + + if (Resources.fitsIn(rmContainer.getLastConfirmedResource(), + rmContainer.getContainer().getResource())) { + LOG.info("Roll back resource for container " + containerId); + handleDecreaseRequests(app, Arrays.asList( + UpdateContainerRequest.newInstance( + rmContainer.getContainer().getVersion(), + rmContainer.getContainerId(), + ContainerUpdateType.DECREASE_RESOURCE, + rmContainer.getLastConfirmedResource(), null))); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 48ecd2e..bff9c41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -90,9 +90,7 @@ public class AppSchedulingInfo { schedulerKeys = new ConcurrentSkipListMap<>(); final Map> schedulerKeyToPlacementSets = new ConcurrentHashMap<>(); - final Map>> containerIncreaseRequestMap = - new ConcurrentHashMap<>(); + private final ReentrantReadWriteLock.ReadLock readLock; private final ReentrantReadWriteLock.WriteLock writeLock; @@ -158,137 +156,6 @@ public class AppSchedulingInfo { LOG.info("Application " + applicationId + " requests cleared"); } - public boolean hasIncreaseRequest(NodeId nodeId) { - try { - this.readLock.lock(); - Map> - requestsOnNode = containerIncreaseRequestMap.get(nodeId); - return requestsOnNode == null ? false : requestsOnNode.size() > 0; - } finally { - this.readLock.unlock(); - } - } - - public Map - getIncreaseRequests(NodeId nodeId, SchedulerRequestKey schedulerKey) { - try { - this.readLock.lock(); - Map> - requestsOnNode = containerIncreaseRequestMap.get(nodeId); - return requestsOnNode == null ? null : requestsOnNode.get( - schedulerKey); - } finally { - this.readLock.unlock(); - } - } - - /** - * return true if any of the existing increase requests are updated, - * false if none of them are updated - */ - public boolean updateIncreaseRequests( - List increaseRequests) { - boolean resourceUpdated = false; - - try { - this.writeLock.lock(); - for (SchedContainerChangeRequest r : increaseRequests) { - if (r.getRMContainer().getState() != RMContainerState.RUNNING) { - LOG.warn("rmContainer's state is not RUNNING, for increase request" - + " with container-id=" + r.getContainerId()); - continue; - } - try { - RMServerUtils.checkSchedContainerChangeRequest(r, true); - } catch (YarnException e) { - LOG.warn("Error happens when checking increase request, Ignoring.." - + " exception=", e); - continue; - } - NodeId nodeId = r.getRMContainer().getAllocatedNode(); - - Map> - requestsOnNode = containerIncreaseRequestMap.get(nodeId); - if (null == requestsOnNode) { - requestsOnNode = new TreeMap<>(); - containerIncreaseRequestMap.put(nodeId, requestsOnNode); - } - - SchedContainerChangeRequest prevChangeRequest = - getIncreaseRequest(nodeId, - r.getRMContainer().getAllocatedSchedulerKey(), - r.getContainerId()); - if (null != prevChangeRequest) { - if (Resources.equals(prevChangeRequest.getTargetCapacity(), - r.getTargetCapacity())) { - // increase request hasn't changed - continue; - } - - // remove the old one, as we will use the new one going forward - removeIncreaseRequest(nodeId, - prevChangeRequest.getRMContainer().getAllocatedSchedulerKey(), - prevChangeRequest.getContainerId()); - } - - if (Resources.equals(r.getTargetCapacity(), - r.getRMContainer().getAllocatedResource())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to increase container " + r.getContainerId() - + ", target capacity = previous capacity = " + prevChangeRequest - + ". Will ignore this increase request."); - } - continue; - } - - // add the new one - resourceUpdated = true; - insertIncreaseRequest(r); - } - return resourceUpdated; - } finally { - this.writeLock.unlock(); - } - } - - /** - * Insert increase request, adding any missing items in the data-structure - * hierarchy. - */ - private void insertIncreaseRequest(SchedContainerChangeRequest request) { - NodeId nodeId = request.getNodeId(); - SchedulerRequestKey schedulerKey = - request.getRMContainer().getAllocatedSchedulerKey(); - ContainerId containerId = request.getContainerId(); - - Map> - requestsOnNode = containerIncreaseRequestMap.get(nodeId); - if (null == requestsOnNode) { - requestsOnNode = new HashMap<>(); - containerIncreaseRequestMap.put(nodeId, requestsOnNode); - } - - Map requestsOnNodeWithPriority = - requestsOnNode.get(schedulerKey); - if (null == requestsOnNodeWithPriority) { - requestsOnNodeWithPriority = new TreeMap<>(); - requestsOnNode.put(schedulerKey, requestsOnNodeWithPriority); - incrementSchedulerKeyReference(schedulerKey); - } - - requestsOnNodeWithPriority.put(containerId, request); - - // update resources - String partition = request.getRMContainer().getNodeLabelExpression(); - Resource delta = request.getDeltaCapacity(); - appResourceUsage.incPending(partition, delta); - queue.incPendingResource(partition, delta); - - if (LOG.isDebugEnabled()) { - LOG.debug("Added increase request:" + request.getContainerId() - + " delta=" + delta); - } - } private void incrementSchedulerKeyReference( SchedulerRequestKey schedulerKey) { @@ -312,73 +179,6 @@ public class AppSchedulingInfo { } } - public boolean removeIncreaseRequest(NodeId nodeId, - SchedulerRequestKey schedulerKey, ContainerId containerId) { - try { - this.writeLock.lock(); - Map> - requestsOnNode = containerIncreaseRequestMap.get(nodeId); - if (null == requestsOnNode) { - return false; - } - - Map requestsOnNodeWithPriority = - requestsOnNode.get(schedulerKey); - if (null == requestsOnNodeWithPriority) { - return false; - } - - SchedContainerChangeRequest request = - requestsOnNodeWithPriority.remove(containerId); - - // remove hierarchies if it becomes empty - if (requestsOnNodeWithPriority.isEmpty()) { - requestsOnNode.remove(schedulerKey); - decrementSchedulerKeyReference(schedulerKey); - } - if (requestsOnNode.isEmpty()) { - containerIncreaseRequestMap.remove(nodeId); - } - - if (request == null) { - return false; - } - - // update queue's pending resource if request exists - String partition = request.getRMContainer().getNodeLabelExpression(); - Resource delta = request.getDeltaCapacity(); - appResourceUsage.decPending(partition, delta); - queue.decPendingResource(partition, delta); - - if (LOG.isDebugEnabled()) { - LOG.debug("remove increase request:" + request); - } - - return true; - } finally { - this.writeLock.unlock(); - } - } - - public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId, - SchedulerRequestKey schedulerKey, ContainerId containerId) { - try { - this.readLock.lock(); - Map> - requestsOnNode = containerIncreaseRequestMap.get(nodeId); - if (null == requestsOnNode) { - return null; - } - - Map requestsOnNodeWithPriority = - requestsOnNode.get(schedulerKey); - return requestsOnNodeWithPriority == null ? null - : requestsOnNodeWithPriority.get(containerId); - } finally { - this.readLock.unlock(); - } - } - public ContainerUpdateContext getUpdateContext() { return updateContext; } @@ -514,21 +314,6 @@ public class AppSchedulingInfo { appResourceUsage.decPending(partition, toDecrease); } - private boolean hasRequestLabelChanged(ResourceRequest requestOne, - ResourceRequest requestTwo) { - String requestOneLabelExp = requestOne.getNodeLabelExpression(); - String requestTwoLabelExp = requestTwo.getNodeLabelExpression(); - // First request label expression can be null and second request - // is not null then we have to consider it as changed. - if ((null == requestOneLabelExp) && (null != requestTwoLabelExp)) { - return true; - } - // If the label is not matching between both request when - // requestOneLabelExp is not null. - return ((null != requestOneLabelExp) && !(requestOneLabelExp - .equals(requestTwoLabelExp))); - } - /** * The ApplicationMaster is updating the placesBlacklistedByApp used for * containers other than AMs. @@ -601,22 +386,6 @@ public class AppSchedulingInfo { return ret; } - public SchedulingPlacementSet getFirstSchedulingPlacementSet() { - try { - readLock.lock(); - for (SchedulerRequestKey key : schedulerKeys.keySet()) { - SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(key); - if (null != ps) { - return ps; - } - } - return null; - } finally { - readLock.unlock(); - } - - } - public PendingAsk getNextPendingAsk() { try { readLock.lock(); @@ -666,56 +435,6 @@ public class AppSchedulingInfo { } } - public void increaseContainer(SchedContainerChangeRequest increaseRequest) { - NodeId nodeId = increaseRequest.getNodeId(); - SchedulerRequestKey schedulerKey = - increaseRequest.getRMContainer().getAllocatedSchedulerKey(); - ContainerId containerId = increaseRequest.getContainerId(); - Resource deltaCapacity = increaseRequest.getDeltaCapacity(); - - if (LOG.isDebugEnabled()) { - LOG.debug("allocated increase request : applicationId=" + applicationId - + " container=" + containerId + " host=" - + increaseRequest.getNodeId() + " user=" + user + " resource=" - + deltaCapacity); - } - try { - this.writeLock.lock(); - // Set queue metrics - queue.getMetrics().allocateResources(user, deltaCapacity); - // remove the increase request from pending increase request map - removeIncreaseRequest(nodeId, schedulerKey, containerId); - // update usage - appResourceUsage.incUsed(increaseRequest.getNodePartition(), - deltaCapacity); - } finally { - this.writeLock.unlock(); - } - } - - public void decreaseContainer(SchedContainerChangeRequest decreaseRequest) { - // Delta is negative when it's a decrease request - Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity()); - - if (LOG.isDebugEnabled()) { - LOG.debug("Decrease container : applicationId=" + applicationId - + " container=" + decreaseRequest.getContainerId() + " host=" - + decreaseRequest.getNodeId() + " user=" + user + " resource=" - + absDelta); - } - - try { - this.writeLock.lock(); - // Set queue metrics - queue.getMetrics().releaseResources(user, absDelta); - - // update usage - appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta); - } finally { - this.writeLock.unlock(); - } - } - public List allocate(NodeType type, SchedulerNode node, SchedulerRequestKey schedulerKey, Container containerAllocated) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java index 7381250..5ac2ac5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java @@ -28,17 +28,19 @@ import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer + .RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement + .SchedulingPlacementSet; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.util.resource.Resources; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; @@ -58,43 +60,37 @@ public class ContainerUpdateContext { private final Map>>> outstandingIncreases = new HashMap<>(); - private final Set outstandingDecreases = new HashSet<>(); + private final Map outstandingDecreases = + new HashMap<>(); private final AppSchedulingInfo appSchedulingInfo; ContainerUpdateContext(AppSchedulingInfo appSchedulingInfo) { this.appSchedulingInfo = appSchedulingInfo; } - private synchronized boolean isBeingIncreased(Container container) { - Map>> resourceMap = - outstandingIncreases.get( - new SchedulerRequestKey(container.getPriority(), - container.getAllocationRequestId(), container.getId())); - if (resourceMap != null) { - Map> locationMap = - resourceMap.get(container.getResource()); - if (locationMap != null) { - Set containerIds = locationMap.get(container.getNodeId()); - if (containerIds != null && !containerIds.isEmpty()) { - return containerIds.contains(container.getId()); - } - } - } - return false; - } - /** * Add the container to outstanding decreases. + * @param updateReq UpdateContainerRequest. + * @param schedulerNode SchedulerNode. * @param container Container. - * @return true if updated to outstanding decreases was successful. + * @return If it was possible to decrease the container. */ public synchronized boolean checkAndAddToOutstandingDecreases( + UpdateContainerRequest updateReq, SchedulerNode schedulerNode, Container container) { - if (isBeingIncreased(container) - || outstandingDecreases.contains(container.getId())) { + if (outstandingDecreases.containsKey(container.getId())) { return false; } - outstandingDecreases.add(container.getId()); + if (ContainerUpdateType.DECREASE_RESOURCE == + updateReq.getContainerUpdateType()) { + SchedulerRequestKey updateKey = new SchedulerRequestKey + (container.getPriority(), + container.getAllocationRequestId(), container.getId()); + cancelPreviousRequest(schedulerNode, updateKey); + outstandingDecreases.put(container.getId(), updateReq.getCapability()); + } else { + outstandingDecreases.put(container.getId(), container.getResource()); + } return true; } @@ -117,35 +113,63 @@ public class ContainerUpdateContext { if (resourceMap == null) { resourceMap = new HashMap<>(); outstandingIncreases.put(schedulerKey, resourceMap); + } else { + // Updating Resource for and existing increase container + if (ContainerUpdateType.INCREASE_RESOURCE == + updateRequest.getContainerUpdateType()) { + cancelPreviousRequest(schedulerNode, schedulerKey); + } else { + return false; + } } + Resource resToIncrease = getResourceToIncrease(updateRequest, rmContainer); Map> locationMap = - resourceMap.get(container.getResource()); + resourceMap.get(resToIncrease); if (locationMap == null) { locationMap = new HashMap<>(); - resourceMap.put(container.getResource(), locationMap); + resourceMap.put(resToIncrease, locationMap); } Set containerIds = locationMap.get(container.getNodeId()); if (containerIds == null) { containerIds = new HashSet<>(); locationMap.put(container.getNodeId(), containerIds); } - if (containerIds.contains(container.getId()) - || outstandingDecreases.contains(container.getId())) { + if (outstandingDecreases.containsKey(container.getId())) { return false; } - containerIds.add(container.getId()); - Map> updateResReqs = - new HashMap<>(); - Resource resToIncrease = getResourceToIncrease(updateRequest, rmContainer); - Map resMap = - createResourceRequests(rmContainer, schedulerNode, - schedulerKey, resToIncrease); - updateResReqs.put(schedulerKey, resMap); - appSchedulingInfo.addToPlacementSets(false, updateResReqs); + containerIds.add(container.getId()); + if (!Resources.isNone(resToIncrease)) { + Map> updateResReqs = + new HashMap<>(); + Map resMap = + createResourceRequests(rmContainer, schedulerNode, + schedulerKey, resToIncrease); + updateResReqs.put(schedulerKey, resMap); + appSchedulingInfo.addToPlacementSets(false, updateResReqs); + } return true; } + private void cancelPreviousRequest(SchedulerNode schedulerNode, + SchedulerRequestKey schedulerKey) { + SchedulingPlacementSet schedulingPlacementSet = + appSchedulingInfo.getSchedulingPlacementSet(schedulerKey); + if (schedulingPlacementSet != null) { + Map resourceRequests = schedulingPlacementSet + .getResourceRequests(); + ResourceRequest prevReq = resourceRequests.get(ResourceRequest.ANY); + // Decrement the pending using a dummy RR with + // resource = prev update req capability + if (prevReq != null) { + appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode, + schedulerKey, Container.newInstance(UNDEFINED, + schedulerNode.getNodeID(), "host:port", + prevReq.getCapability(), schedulerKey.getPriority(), null)); + } + } + } + private Map createResourceRequests( RMContainer rmContainer, SchedulerNode schedulerNode, SchedulerRequestKey schedulerKey, Resource resToIncrease) { @@ -171,10 +195,16 @@ public class ContainerUpdateContext { ContainerUpdateType.PROMOTE_EXECUTION_TYPE) { return rmContainer.getContainer().getResource(); } - // TODO: Fix this for container increase.. - // This has to equal the Resources in excess of fitsIn() - // for container increase and is equal to the container total - // resource for Promotion. + if (updateReq.getContainerUpdateType() == + ContainerUpdateType.INCREASE_RESOURCE) { + // This has to equal the Resources in excess of fitsIn() + // for container increase and is equal to the container total + // resource for Promotion. + Resource maxCap = Resources.componentwiseMax(updateReq.getCapability(), + rmContainer.getContainer().getResource()); + return Resources.add(maxCap, + Resources.negate(rmContainer.getContainer().getResource())); + } return null; } @@ -228,6 +258,7 @@ public class ContainerUpdateContext { /** * Check if a new container is to be matched up against an outstanding * Container increase request. + * @param node SchedulerNode. * @param schedulerKey SchedulerRequestKey. * @param rmContainer RMContainer. * @return ContainerId. @@ -264,4 +295,80 @@ public class ContainerUpdateContext { } return retVal; } + + /** + * Swaps the existing RMContainer's and the temp RMContainers internal + * container references after adjusting the resources in each. + * @param tempRMContainer Temp RMContainer. + * @param existingRMContainer Existing RMContainer. + * @param updateType Update Type. + * @return Existing RMContainer after swapping the container references. + */ + public RMContainer swapContainer(RMContainer tempRMContainer, + RMContainer existingRMContainer, ContainerUpdateType updateType) { + ContainerId matchedContainerId = existingRMContainer.getContainerId(); + // Swap updated container with the existing container + Container tempContainer = tempRMContainer.getContainer(); + + Resource updatedResource = createUpdatedResource( + tempContainer, existingRMContainer.getContainer(), updateType); + Resource resourceToRelease = createResourceToRelease( + existingRMContainer.getContainer(), updateType); + Container newContainer = Container.newInstance(matchedContainerId, + existingRMContainer.getContainer().getNodeId(), + existingRMContainer.getContainer().getNodeHttpAddress(), + updatedResource, + existingRMContainer.getContainer().getPriority(), null, + tempContainer.getExecutionType()); + newContainer.setAllocationRequestId( + existingRMContainer.getContainer().getAllocationRequestId()); + newContainer.setVersion(existingRMContainer.getContainer().getVersion()); + + tempRMContainer.getContainer().setResource(resourceToRelease); + tempRMContainer.getContainer().setExecutionType( + existingRMContainer.getContainer().getExecutionType()); + + ((RMContainerImpl)existingRMContainer).setContainer(newContainer); + return existingRMContainer; + } + + /** + * Returns the resource that the container will finally be assigned with + * at the end of the update operation. + * @param tempContainer Temporary Container created for the operation. + * @param existingContainer Existing Container. + * @param updateType Update Type. + * @return Final Resource. + */ + private Resource createUpdatedResource(Container tempContainer, + Container existingContainer, ContainerUpdateType updateType) { + if (ContainerUpdateType.INCREASE_RESOURCE == updateType) { + return Resources.add(existingContainer.getResource(), + tempContainer.getResource()); + } else if (ContainerUpdateType.DECREASE_RESOURCE == updateType) { + return outstandingDecreases.get(existingContainer.getId()); + } else { + return existingContainer.getResource(); + } + } + + /** + * Returns the resources that need to be released at the end of the update + * operation. + * @param existingContainer Existing Container. + * @param updateType Updated type. + * @return Resources to be released. + */ + private Resource createResourceToRelease(Container existingContainer, + ContainerUpdateType updateType) { + if (ContainerUpdateType.INCREASE_RESOURCE == updateType) { + return Resources.none(); + } else if (ContainerUpdateType.DECREASE_RESOURCE == updateType){ + return Resources.add(existingContainer.getResource(), + Resources.negate( + outstandingDecreases.get(existingContainer.getId()))); + } else { + return existingContainer.getResource(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 0e79838..f894a40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -65,7 +66,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppR import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerChangeResourceEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; @@ -73,6 +73,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRese import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; + +import org.apache.hadoop.yarn.server.resourcemanager.rmnode + .RMNodeDecreaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; @@ -136,9 +139,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { private AtomicLong firstContainerAllocatedTime = new AtomicLong(0); protected List newlyAllocatedContainers = new ArrayList<>(); + protected List tempContainerToKill = new ArrayList<>(); protected Map newlyPromotedContainers = new HashMap<>(); protected Map newlyDemotedContainers = new HashMap<>(); - protected List tempContainerToKill = new ArrayList<>(); protected Map newlyDecreasedContainers = new HashMap<>(); protected Map newlyIncreasedContainers = new HashMap<>(); protected Set updatedNMTokens = new HashSet<>(); @@ -670,6 +673,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { rmContainer.handle(new RMContainerUpdatesAcquiredEvent( rmContainer.getContainerId(), ContainerUpdateType.INCREASE_RESOURCE == updateType)); + if (ContainerUpdateType.DECREASE_RESOURCE == updateType) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeDecreaseContainerEvent(rmContainer.getNodeId(), + Collections.singletonList(rmContainer.getContainer()))); + } } return container; } @@ -717,11 +725,16 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { } } - public void addToNewlyDemotedContainers(ContainerId containerId, + public synchronized void addToNewlyDemotedContainers(ContainerId containerId, RMContainer rmContainer) { newlyDemotedContainers.put(containerId, rmContainer); } + public synchronized void addToNewlyDecreasedContainers( + ContainerId containerId, RMContainer rmContainer) { + newlyDecreasedContainers.put(containerId, rmContainer); + } + protected synchronized void addToUpdateContainerErrors( UpdateContainerError error) { updateContainerErrors.add(error); @@ -729,10 +742,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { protected synchronized void addToNewlyAllocatedContainers( SchedulerNode node, RMContainer rmContainer) { - if (oppContainerContext == null) { - newlyAllocatedContainers.add(rmContainer); - return; - } ContainerId matchedContainerId = getUpdateContext().matchContainerToOutstandingIncreaseReq( node, rmContainer.getAllocatedSchedulerKey(), rmContainer); @@ -745,7 +754,21 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { // occurs when using MiniYARNCluster to test). tempContainerToKill.add(rmContainer); } else { - newlyPromotedContainers.put(matchedContainerId, rmContainer); + RMContainer existingContainer = getRMContainer(matchedContainerId); + // If this container was already GUARANTEED, then it is an + // increase, else its a promotion + if (existingContainer == null || + EnumSet.of(RMContainerState.COMPLETED, RMContainerState.KILLED, + RMContainerState.EXPIRED, RMContainerState.RELEASED).contains( + existingContainer.getState())) { + tempContainerToKill.add(rmContainer); + } else { + if (ExecutionType.GUARANTEED == existingContainer.getExecutionType()) { + newlyIncreasedContainers.put(matchedContainerId, rmContainer); + } else { + newlyPromotedContainers.put(matchedContainerId, rmContainer); + } + } } } else { newlyAllocatedContainers.add(rmContainer); @@ -753,15 +776,25 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { } public List pullNewlyPromotedContainers() { - return pullContainersWithUpdatedExecType(newlyPromotedContainers, + return pullNewlyUpdatedContainers(newlyPromotedContainers, ContainerUpdateType.PROMOTE_EXECUTION_TYPE); } public List pullNewlyDemotedContainers() { - return pullContainersWithUpdatedExecType(newlyDemotedContainers, + return pullNewlyUpdatedContainers(newlyDemotedContainers, ContainerUpdateType.DEMOTE_EXECUTION_TYPE); } + public List pullNewlyIncreasedContainers() { + return pullNewlyUpdatedContainers(newlyIncreasedContainers, + ContainerUpdateType.INCREASE_RESOURCE); + } + + public List pullNewlyDecreasedContainers() { + return pullNewlyUpdatedContainers(newlyDecreasedContainers, + ContainerUpdateType.DECREASE_RESOURCE); + } + public List pullUpdateContainerErrors() { List errors = new ArrayList<>(updateContainerErrors); @@ -775,11 +808,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { * GUARANTEED to OPPORTUNISTIC. * @return Newly Promoted and Demoted containers */ - private List pullContainersWithUpdatedExecType( + private List pullNewlyUpdatedContainers( Map newlyUpdatedContainers, ContainerUpdateType updateTpe) { List updatedContainers = new ArrayList<>(); - if (oppContainerContext == null) { + if (oppContainerContext == null && + (ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateTpe + || ContainerUpdateType.PROMOTE_EXECUTION_TYPE == updateTpe)) { return updatedContainers; } try { @@ -789,19 +824,22 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { while (i.hasNext()) { Map.Entry entry = i.next(); ContainerId matchedContainerId = entry.getKey(); - RMContainer rmContainer = entry.getValue(); - - // swap containers - RMContainer existingRMContainer = swapContainer( - rmContainer, matchedContainerId); - getUpdateContext().removeFromOutstandingUpdate( - rmContainer.getAllocatedSchedulerKey(), - existingRMContainer.getContainer()); - Container updatedContainer = updateContainerAndNMToken( - existingRMContainer, updateTpe); - updatedContainers.add(updatedContainer); - - tempContainerToKill.add(rmContainer); + RMContainer tempRMContainer = entry.getValue(); + + RMContainer existingRMContainer = + getRMContainer(matchedContainerId); + if (existingRMContainer != null) { + // swap containers + existingRMContainer = getUpdateContext().swapContainer( + tempRMContainer, existingRMContainer, updateTpe); + getUpdateContext().removeFromOutstandingUpdate( + tempRMContainer.getAllocatedSchedulerKey(), + existingRMContainer.getContainer()); + Container updatedContainer = updateContainerAndNMToken( + existingRMContainer, updateTpe); + updatedContainers.add(updatedContainer); + } + tempContainerToKill.add(tempRMContainer); i.remove(); } // Release all temporary containers @@ -823,68 +861,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { } } - private RMContainer swapContainer(RMContainer rmContainer, ContainerId - matchedContainerId) { - RMContainer existingRMContainer = - getRMContainer(matchedContainerId); - if (existingRMContainer != null) { - // Swap updated container with the existing container - Container updatedContainer = rmContainer.getContainer(); - - Container newContainer = Container.newInstance(matchedContainerId, - existingRMContainer.getContainer().getNodeId(), - existingRMContainer.getContainer().getNodeHttpAddress(), - updatedContainer.getResource(), - existingRMContainer.getContainer().getPriority(), null, - updatedContainer.getExecutionType()); - newContainer.setAllocationRequestId( - existingRMContainer.getContainer().getAllocationRequestId()); - newContainer.setVersion(existingRMContainer.getContainer().getVersion()); - - rmContainer.getContainer().setResource( - existingRMContainer.getContainer().getResource()); - rmContainer.getContainer().setExecutionType( - existingRMContainer.getContainer().getExecutionType()); - - ((RMContainerImpl)existingRMContainer).setContainer(newContainer); - } - return existingRMContainer; - } - - private List pullNewlyUpdatedContainers( - Map updatedContainerMap, boolean increase) { - try { - writeLock.lock(); - List returnContainerList = new ArrayList ( - updatedContainerMap.size()); - - Iterator> i = - updatedContainerMap.entrySet().iterator(); - while (i.hasNext()) { - RMContainer rmContainer = i.next().getValue(); - Container updatedContainer = updateContainerAndNMToken(rmContainer, - increase ? ContainerUpdateType.INCREASE_RESOURCE : - ContainerUpdateType.DECREASE_RESOURCE); - if (updatedContainer != null) { - returnContainerList.add(updatedContainer); - i.remove(); - } - } - return returnContainerList; - } finally { - writeLock.unlock(); - } - - } - - public List pullNewlyIncreasedContainers() { - return pullNewlyUpdatedContainers(newlyIncreasedContainers, true); - } - - public List pullNewlyDecreasedContainers() { - return pullNewlyUpdatedContainers(newlyDecreasedContainers, false); - } - public List pullUpdatedNMTokens() { try { writeLock.lock(); @@ -1252,68 +1228,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { public ResourceUsage getSchedulingResourceUsage() { return attemptResourceUsage; } - - public boolean removeIncreaseRequest(NodeId nodeId, - SchedulerRequestKey schedulerKey, ContainerId containerId) { - try { - writeLock.lock(); - return appSchedulingInfo.removeIncreaseRequest(nodeId, schedulerKey, - containerId); - } finally { - writeLock.unlock(); - } - } - - public boolean updateIncreaseRequests( - List increaseRequests) { - try { - writeLock.lock(); - return appSchedulingInfo.updateIncreaseRequests(increaseRequests); - } finally { - writeLock.unlock(); - } - } - - private void changeContainerResource( - SchedContainerChangeRequest changeRequest, boolean increase) { - try { - writeLock.lock(); - if (increase) { - appSchedulingInfo.increaseContainer(changeRequest); - } else{ - appSchedulingInfo.decreaseContainer(changeRequest); - } - - RMContainer changedRMContainer = changeRequest.getRMContainer(); - changedRMContainer.handle( - new RMContainerChangeResourceEvent(changeRequest.getContainerId(), - changeRequest.getTargetCapacity(), increase)); - - // remove pending and not pulled by AM newly-increased or - // decreased-containers and add the new one - if (increase) { - newlyDecreasedContainers.remove(changeRequest.getContainerId()); - newlyIncreasedContainers.put(changeRequest.getContainerId(), - changedRMContainer); - } else{ - newlyIncreasedContainers.remove(changeRequest.getContainerId()); - newlyDecreasedContainers.put(changeRequest.getContainerId(), - changedRMContainer); - } - } finally { - writeLock.unlock(); - } - } - - public void decreaseContainer( - SchedContainerChangeRequest decreaseRequest) { - changeContainerResource(decreaseRequest, false); - } - - public void increaseContainer( - SchedContainerChangeRequest increaseRequest) { - changeContainerResource(increaseRequest, true); - } public void setAppAMNodePartitionName(String partitionName) { this.appAMNodePartitionName = partitionName; http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 9c2dff3..db17b42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -180,49 +180,6 @@ public abstract class SchedulerNode { } /** - * Change the resources allocated for a container. - * @param containerId Identifier of the container to change. - * @param deltaResource Change in the resource allocation. - * @param increase True if the change is an increase of allocation. - */ - protected synchronized void changeContainerResource(ContainerId containerId, - Resource deltaResource, boolean increase) { - if (increase) { - deductUnallocatedResource(deltaResource); - } else { - addUnallocatedResource(deltaResource); - } - - if (LOG.isDebugEnabled()) { - LOG.debug((increase ? "Increased" : "Decreased") + " container " - + containerId + " of capacity " + deltaResource + " on host " - + rmNode.getNodeAddress() + ", which has " + numContainers - + " containers, " + getAllocatedResource() + " used and " - + getUnallocatedResource() + " available after allocation"); - } - } - - /** - * Increase the resources allocated to a container. - * @param containerId Identifier of the container to change. - * @param deltaResource Increase of resource allocation. - */ - public synchronized void increaseContainer(ContainerId containerId, - Resource deltaResource) { - changeContainerResource(containerId, deltaResource, true); - } - - /** - * Decrease the resources allocated to a container. - * @param containerId Identifier of the container to change. - * @param deltaResource Decrease of resource allocation. - */ - public synchronized void decreaseContainer(ContainerId containerId, - Resource deltaResource) { - changeContainerResource(containerId, deltaResource, false); - } - - /** * Get unallocated resources on the node. * @return Unallocated resources on the node */ @@ -280,7 +237,6 @@ public abstract class SchedulerNode { if (info == null) { return; } - if (!releasedByNode && info.launchedOnNode) { // wait until node reports container has completed return; http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index e9ef319..aa60c9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -453,14 +453,13 @@ public abstract class AbstractCSQueue implements CSQueue { } void allocateResource(Resource clusterResource, - Resource resource, String nodePartition, boolean changeContainerResource) { + Resource resource, String nodePartition) { try { writeLock.lock(); queueUsage.incUsed(nodePartition, resource); - if (!changeContainerResource) { - ++numContainers; - } + ++numContainers; + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, minimumAllocation, this, labelManager, nodePartition); } finally { @@ -469,7 +468,7 @@ public abstract class AbstractCSQueue implements CSQueue { } protected void releaseResource(Resource clusterResource, - Resource resource, String nodePartition, boolean changeContainerResource) { + Resource resource, String nodePartition) { try { writeLock.lock(); queueUsage.decUsed(nodePartition, resource); @@ -477,9 +476,7 @@ public abstract class AbstractCSQueue implements CSQueue { CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, minimumAllocation, this, labelManager, nodePartition); - if (!changeContainerResource) { - --numContainers; - } + --numContainers; } finally { writeLock.unlock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index a65b3d2..6d30386 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -231,14 +231,6 @@ public interface CSQueue extends SchedulerQueue { boolean sortQueues); /** - * We have a reserved increased container in the queue, we need to unreserve - * it. Since we just want to cancel the reserved increase request instead of - * stop the container, we shouldn't call completedContainer for such purpose. - */ - public void unreserveIncreasedContainer(Resource clusterResource, - FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer); - - /** * Get the number of applications in the queue. * @return number of applications */ @@ -333,13 +325,6 @@ public interface CSQueue extends SchedulerQueue { * new resource asked */ public void decPendingResource(String nodeLabel, Resource resourceToDec); - - /** - * Decrease container resource in the queue - */ - public void decreaseContainer(Resource clusterResource, - SchedContainerChangeRequest decreaseRequest, - FiCaSchedulerApp app) throws InvalidResourceRequestException; /** * Get valid Node Labels for this queue http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 3517764..20ea607 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; import java.util.List; @@ -60,9 +59,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; @@ -85,7 +82,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; @@ -99,7 +95,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; @@ -872,43 +868,6 @@ public class CapacityScheduler extends } } - private LeafQueue updateIncreaseRequests( - List increaseRequests, FiCaSchedulerApp app) { - // When application has some pending to-be-removed resource requests, - app.removedToBeRemovedIncreaseRequests(); - - if (null == increaseRequests || increaseRequests.isEmpty()) { - return null; - } - - // Pre-process increase requests - List schedIncreaseRequests = - createSchedContainerChangeRequests(increaseRequests, true); - LeafQueue leafQueue = (LeafQueue) app.getQueue(); - - try { - /* - * Acquire application's lock here to make sure application won't - * finish when updateIncreaseRequest is called. - */ - app.getWriteLock().lock(); - // make sure we aren't stopping/removing the application - // when the allocate comes in - if (app.isStopped()) { - return null; - } - // Process increase resource requests - if (app.updateIncreaseRequests(schedIncreaseRequests)) { - return leafQueue; - } - } finally { - app.getWriteLock().unlock(); - } - - - return null; - } - @Override @Lock(Lock.NoLock.class) public Allocation allocate(ApplicationAttemptId applicationAttemptId, @@ -920,21 +879,13 @@ public class CapacityScheduler extends return EMPTY_ALLOCATION; } - // Handle promotions and demotions - handleExecutionTypeUpdates( - application, updateRequests.getPromotionRequests(), - updateRequests.getDemotionRequests()); + // Handle all container updates + handleContainerUpdates(application, updateRequests); // Release containers releaseContainers(release, application); - // update increase requests - LeafQueue updateDemandForQueue = - updateIncreaseRequests(updateRequests.getIncreaseRequests(), - application); - - // Decrease containers - decreaseContainers(updateRequests.getDecreaseRequests(), application); + LeafQueue updateDemandForQueue = null; // Sanity check for new allocation requests normalizeRequests(ask); @@ -959,8 +910,7 @@ public class CapacityScheduler extends } // Update application requests - if (application.updateResourceRequests(ask) && (updateDemandForQueue - == null)) { + if (application.updateResourceRequests(ask)) { updateDemandForQueue = (LeafQueue) application.getQueue(); } @@ -1466,7 +1416,7 @@ public class CapacityScheduler extends (ContainerExpiredSchedulerEvent) event; ContainerId containerId = containerExpiredEvent.getContainerId(); if (containerExpiredEvent.isIncrease()) { - rollbackContainerResource(containerId); + rollbackContainerUpdate(containerId); } else { completedContainer(getRMContainer(containerId), SchedulerUtils.createAbnormalContainerStatus( @@ -1618,31 +1568,6 @@ public class CapacityScheduler extends } } - private void rollbackContainerResource( - ContainerId containerId) { - RMContainer rmContainer = getRMContainer(containerId); - if (rmContainer == null) { - LOG.info("Cannot rollback resource for container " + containerId - + ". The container does not exist."); - return; - } - FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId); - if (application == null) { - LOG.info("Cannot rollback resource for container " + containerId - + ". The application that the container " - + "belongs to does not exist."); - return; - } - LOG.info("Roll back resource for container " + containerId); - - SchedulerNode schedulerNode = getSchedulerNode( - rmContainer.getAllocatedNode()); - SchedContainerChangeRequest decreaseRequest = - new SchedContainerChangeRequest(this.rmContext, schedulerNode, - rmContainer, rmContainer.getLastConfirmedResource()); - decreaseContainer(decreaseRequest, application); - } - @Override protected void completedContainerInternal( RMContainer rmContainer, ContainerStatus containerStatus, @@ -1676,32 +1601,6 @@ public class CapacityScheduler extends rmContainer, containerStatus, event, null, true); } - @Override - protected void decreaseContainer(SchedContainerChangeRequest decreaseRequest, - SchedulerApplicationAttempt attempt) { - RMContainer rmContainer = decreaseRequest.getRMContainer(); - // Check container status before doing decrease - if (rmContainer.getState() != RMContainerState.RUNNING) { - LOG.info( - "Trying to decrease a container not in RUNNING state, container=" - + rmContainer + " state=" + rmContainer.getState().name()); - return; - } - FiCaSchedulerApp app = (FiCaSchedulerApp) attempt; - LeafQueue queue = (LeafQueue) attempt.getQueue(); - try { - queue.decreaseContainer(getClusterResource(), decreaseRequest, app); - // Notify RMNode that the container can be pulled by NodeManager in the - // next heartbeat - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeDecreaseContainerEvent(decreaseRequest.getNodeId(), - Collections.singletonList(rmContainer.getContainer()))); - } catch (InvalidResourceRequestException e) { - LOG.warn("Error happens when checking decrease request, Ignoring.." - + " exception=", e); - } - } - @Lock(Lock.NoLock.class) @VisibleForTesting @Override @@ -2386,8 +2285,8 @@ public class CapacityScheduler extends getSchedulerContainer(rmContainer, true), getSchedulerContainersToRelease(csAssignment), getSchedulerContainer(csAssignment.getFulfilledReservedContainer(), - false), csAssignment.isIncreasedAllocation(), - csAssignment.getType(), csAssignment.getRequestLocalityType(), + false), csAssignment.getType(), + csAssignment.getRequestLocalityType(), csAssignment.getSchedulingMode() != null ? csAssignment.getSchedulingMode() : SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, @@ -2403,8 +2302,8 @@ public class CapacityScheduler extends getSchedulerContainer(rmContainer, false), getSchedulerContainersToRelease(csAssignment), getSchedulerContainer(csAssignment.getFulfilledReservedContainer(), - false), csAssignment.isIncreasedAllocation(), - csAssignment.getType(), csAssignment.getRequestLocalityType(), + false), csAssignment.getType(), + csAssignment.getRequestLocalityType(), csAssignment.getSchedulingMode() != null ? csAssignment.getSchedulingMode() : SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org