Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DA86018D30 for ; Wed, 30 Dec 2015 23:42:06 +0000 (UTC) Received: (qmail 25739 invoked by uid 500); 30 Dec 2015 23:42:06 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 25530 invoked by uid 500); 30 Dec 2015 23:42:06 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 25355 invoked by uid 99); 30 Dec 2015 23:42:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Dec 2015 23:42:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3E65BE04AF; Wed, 30 Dec 2015 23:42:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wangda@apache.org To: common-commits@hadoop.apache.org Date: Wed, 30 Dec 2015 23:42:08 -0000 Message-Id: <046b9aa51d764c5790bf3c18dbb558dc@git.apache.org> In-Reply-To: <2000951788864810bd3fb7eef49a7961@git.apache.org> References: <2000951788864810bd3fb7eef49a7961@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] hadoop git commit: YARN-4524. Cleanup AppSchedulingInfo. (Karthik Kambatla via wangda) YARN-4524. Cleanup AppSchedulingInfo. (Karthik Kambatla via wangda) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/05fa852d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/05fa852d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/05fa852d Branch: refs/heads/branch-2 Commit: 05fa852d7567b7590d6b53bbf925f8f424736514 Parents: 6eefae1 Author: Wangda Tan Authored: Wed Dec 30 15:36:55 2015 -0800 Committer: Wangda Tan Committed: Wed Dec 30 15:36:55 2015 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 2 + .../scheduler/AppSchedulingInfo.java | 379 +++++++++---------- .../scheduler/SchedulerApplicationAttempt.java | 2 +- 3 files changed, 174 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/05fa852d/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c208c17..146ed62 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -26,6 +26,8 @@ Release 2.9.0 - UNRELEASED YARN-4522. Queue acl can be checked at app submission. (Jian He via wangda) + YARN-4524. Cleanup AppSchedulingInfo. (Karthik Kambatla via wangda) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/05fa852d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index c5f8cd1..41d3fd7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -42,7 +43,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.util.resource.Resources; @@ -56,40 +56,36 @@ import org.apache.hadoop.yarn.util.resource.Resources; public class AppSchedulingInfo { private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class); + private static final Comparator COMPARATOR = + new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator(); + private static final int EPOCH_BIT_SHIFT = 40; + + private final ApplicationId applicationId; private final ApplicationAttemptId applicationAttemptId; - final ApplicationId applicationId; - private String queueName; - Queue queue; - final String user; - // TODO making containerIdCounter long private final AtomicLong containerIdCounter; - private final int EPOCH_BIT_SHIFT = 40; + private final String user; + + private Queue queue; + private ActiveUsersManager activeUsersManager; + private boolean pending = true; // whether accepted/allocated by scheduler + private ResourceUsage appResourceUsage; + + private final Set amBlacklist = new HashSet<>(); + private Set userBlacklist = new HashSet<>(); - final Set priorities = new TreeSet( - new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator()); + final Set priorities = new TreeSet<>(COMPARATOR); final Map> resourceRequestMap = - new ConcurrentHashMap>(); - final Map>> increaseRequestMap = new ConcurrentHashMap<>(); - private Set userBlacklist = new HashSet<>(); - private Set amBlacklist = new HashSet<>(); + final Map>> containerIncreaseRequestMap = + new ConcurrentHashMap<>(); - //private final ApplicationStore store; - private ActiveUsersManager activeUsersManager; - - /* Allocated by scheduler */ - boolean pending = true; // for app metrics - - private ResourceUsage appResourceUsage; - public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, long epoch, ResourceUsage appResourceUsage) { this.applicationAttemptId = appAttemptId; this.applicationId = appAttemptId.getApplicationId(); this.queue = queue; - this.queueName = queue.getQueueName(); this.user = user; this.activeUsersManager = activeUsersManager; this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT); @@ -104,14 +100,18 @@ public class AppSchedulingInfo { return applicationAttemptId; } - public String getQueueName() { - return queueName; - } - public String getUser() { return user; } + public long getNewContainerId() { + return this.containerIdCounter.incrementAndGet(); + } + + public synchronized String getQueueName() { + return queue.getQueueName(); + } + public synchronized boolean isPending() { return pending; } @@ -125,30 +125,23 @@ public class AppSchedulingInfo { LOG.info("Application " + applicationId + " requests cleared"); } - public long getNewContainerId() { - return this.containerIdCounter.incrementAndGet(); - } - - public boolean hasIncreaseRequest(NodeId nodeId) { + public synchronized boolean hasIncreaseRequest(NodeId nodeId) { Map> requestsOnNode = - increaseRequestMap.get(nodeId); - if (null == requestsOnNode) { - return false; - } - return requestsOnNode.size() > 0; + containerIncreaseRequestMap.get(nodeId); + return requestsOnNode == null ? false : requestsOnNode.size() > 0; } - public Map + public synchronized Map getIncreaseRequests(NodeId nodeId, Priority priority) { Map> requestsOnNode = - increaseRequestMap.get(nodeId); - if (null == requestsOnNode) { - return null; - } - - return requestsOnNode.get(priority); + containerIncreaseRequestMap.get(nodeId); + return requestsOnNode == null ? null : requestsOnNode.get(priority); } + /** + * return true if any of the existing increase requests are updated, + * false if none of them are updated + */ public synchronized boolean updateIncreaseRequests( List increaseRequests) { boolean resourceUpdated = false; @@ -157,10 +150,10 @@ public class AppSchedulingInfo { NodeId nodeId = r.getRMContainer().getAllocatedNode(); Map> requestsOnNode = - increaseRequestMap.get(nodeId); + containerIncreaseRequestMap.get(nodeId); if (null == requestsOnNode) { requestsOnNode = new TreeMap<>(); - increaseRequestMap.put(nodeId, requestsOnNode); + containerIncreaseRequestMap.put(nodeId, requestsOnNode); } SchedContainerChangeRequest prevChangeRequest = @@ -168,22 +161,21 @@ public class AppSchedulingInfo { if (null != prevChangeRequest) { if (Resources.equals(prevChangeRequest.getTargetCapacity(), r.getTargetCapacity())) { - // New target capacity is as same as what we have, just ignore the new - // one + // increase request hasn't changed continue; } - // remove the old one + // remove the old one, as we will use the new one going forward removeIncreaseRequest(nodeId, prevChangeRequest.getPriority(), prevChangeRequest.getContainerId()); } - if (Resources.equals(r.getTargetCapacity(), r.getRMContainer().getAllocatedResource())) { + if (Resources.equals(r.getTargetCapacity(), + r.getRMContainer().getAllocatedResource())) { if (LOG.isDebugEnabled()) { - LOG.debug("Trying to increase/decrease container, " - + "target capacity = previous capacity = " + prevChangeRequest - + " for container=" + r.getContainerId() - + ". Will ignore this increase request"); + LOG.debug("Trying to increase container " + r.getContainerId() + + ", target capacity = previous capacity = " + prevChangeRequest + + ". Will ignore this increase request."); } continue; } @@ -195,25 +187,26 @@ public class AppSchedulingInfo { return resourceUpdated; } - // insert increase request and add missing hierarchy if missing + /** + * Insert increase request, adding any missing items in the data-structure + * hierarchy. + */ private void insertIncreaseRequest(SchedContainerChangeRequest request) { NodeId nodeId = request.getNodeId(); Priority priority = request.getPriority(); ContainerId containerId = request.getContainerId(); Map> requestsOnNode = - increaseRequestMap.get(nodeId); + containerIncreaseRequestMap.get(nodeId); if (null == requestsOnNode) { - requestsOnNode = - new HashMap>(); - increaseRequestMap.put(nodeId, requestsOnNode); + requestsOnNode = new HashMap<>(); + containerIncreaseRequestMap.put(nodeId, requestsOnNode); } Map requestsOnNodeWithPriority = requestsOnNode.get(priority); if (null == requestsOnNodeWithPriority) { - requestsOnNodeWithPriority = - new TreeMap(); + requestsOnNodeWithPriority = new TreeMap<>(); requestsOnNode.put(priority, requestsOnNodeWithPriority); } @@ -237,7 +230,7 @@ public class AppSchedulingInfo { public synchronized boolean removeIncreaseRequest(NodeId nodeId, Priority priority, ContainerId containerId) { Map> requestsOnNode = - increaseRequestMap.get(nodeId); + containerIncreaseRequestMap.get(nodeId); if (null == requestsOnNode) { return false; } @@ -256,7 +249,7 @@ public class AppSchedulingInfo { requestsOnNode.remove(priority); } if (requestsOnNode.isEmpty()) { - increaseRequestMap.remove(nodeId); + containerIncreaseRequestMap.remove(nodeId); } if (request == null) { @@ -279,18 +272,15 @@ public class AppSchedulingInfo { public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId, Priority priority, ContainerId containerId) { Map> requestsOnNode = - increaseRequestMap.get(nodeId); + containerIncreaseRequestMap.get(nodeId); if (null == requestsOnNode) { return null; } Map requestsOnNodeWithPriority = requestsOnNode.get(priority); - if (null == requestsOnNodeWithPriority) { - return null; - } - - return requestsOnNodeWithPriority.get(containerId); + return requestsOnNodeWithPriority == null ? null + : requestsOnNodeWithPriority.get(containerId); } /** @@ -299,121 +289,120 @@ public class AppSchedulingInfo { * by the application. * * @param requests resources to be acquired - * @param recoverPreemptedRequest recover Resource Request on preemption - * @return true if any resource was updated, false else + * @param recoverPreemptedRequest recover ResourceRequest on preemption + * @return true if any resource was updated, false otherwise */ - synchronized public boolean updateResourceRequests( + public synchronized boolean updateResourceRequests( List requests, boolean recoverPreemptedRequest) { - QueueMetrics metrics = queue.getMetrics(); - + // Flag to track if any incoming requests update "ANY" requests boolean anyResourcesUpdated = false; // Update resource requests for (ResourceRequest request : requests) { Priority priority = request.getPriority(); String resourceName = request.getResourceName(); - boolean updatePendingResources = false; - ResourceRequest lastRequest = null; - if (resourceName.equals(ResourceRequest.ANY)) { - if (LOG.isDebugEnabled()) { - LOG.debug("update:" + " application=" + applicationId + " request=" - + request); - } - updatePendingResources = true; - anyResourcesUpdated = true; - - // Premature optimization? - // Assumes that we won't see more than one priority request updated - // in one call, reasonable assumption... however, it's totally safe - // to activate same application more than once. - // Thus we don't need another loop ala the one in decrementOutstanding() - // which is needed during deactivate. - if (request.getNumContainers() > 0) { - activeUsersManager.activateApplication(user, applicationId); - } - ResourceRequest previousAnyRequest = - getResourceRequest(priority, resourceName); - - // When there is change in ANY request label expression, we should - // update label for all resource requests already added of same - // priority as ANY resource request. - if ((null == previousAnyRequest) - || isRequestLabelChanged(previousAnyRequest, request)) { - Map resourceRequest = - getResourceRequests(priority); - if (resourceRequest != null) { - for (ResourceRequest r : resourceRequest.values()) { - if (!r.getResourceName().equals(ResourceRequest.ANY)) { - r.setNodeLabelExpression(request.getNodeLabelExpression()); - } - } - } - } - } else { - ResourceRequest anyRequest = - getResourceRequest(priority, ResourceRequest.ANY); - if (anyRequest != null) { - request.setNodeLabelExpression(anyRequest.getNodeLabelExpression()); - } - } + // Update node labels if required + updateNodeLabels(request); Map asks = this.resourceRequestMap.get(priority); - if (asks == null) { - asks = new ConcurrentHashMap(); + asks = new ConcurrentHashMap<>(); this.resourceRequestMap.put(priority, asks); this.priorities.add(priority); } - lastRequest = asks.get(resourceName); + // Increment number of containers if recovering preempted resources + ResourceRequest lastRequest = asks.get(resourceName); if (recoverPreemptedRequest && lastRequest != null) { - // Increment the number of containers to 1, as it is recovering a - // single container. request.setNumContainers(lastRequest.getNumContainers() + 1); } + // Update asks asks.put(resourceName, request); - if (updatePendingResources) { - - // Similarly, deactivate application? - if (request.getNumContainers() <= 0) { - LOG.info("checking for deactivate of application :" - + this.applicationId); - checkForDeactivation(); - } - - int lastRequestContainers = lastRequest != null ? lastRequest - .getNumContainers() : 0; - Resource lastRequestCapability = lastRequest != null ? lastRequest - .getCapability() : Resources.none(); - metrics.incrPendingResources(user, request.getNumContainers(), - request.getCapability()); - metrics.decrPendingResources(user, lastRequestContainers, - lastRequestCapability); - - // update queue: - Resource increasedResource = - Resources.multiply(request.getCapability(), - request.getNumContainers()); - queue.incPendingResource(request.getNodeLabelExpression(), - increasedResource); - appResourceUsage.incPending(request.getNodeLabelExpression(), - increasedResource); - if (lastRequest != null) { - Resource decreasedResource = - Resources.multiply(lastRequestCapability, lastRequestContainers); - queue.decPendingResource(lastRequest.getNodeLabelExpression(), - decreasedResource); - appResourceUsage.decPending(lastRequest.getNodeLabelExpression(), - decreasedResource); + + if (resourceName.equals(ResourceRequest.ANY)) { + anyResourcesUpdated = true; + + // Activate application. Metrics activation is done here. + // TODO: Shouldn't we activate even if numContainers = 0? + if (request.getNumContainers() > 0) { + activeUsersManager.activateApplication(user, applicationId); } + + // Update pendingResources + updatePendingResources(lastRequest, request, queue.getMetrics()); } } return anyResourcesUpdated; } - private boolean isRequestLabelChanged(ResourceRequest requestOne, + private void updatePendingResources(ResourceRequest lastRequest, + ResourceRequest request, QueueMetrics metrics) { + if (request.getNumContainers() <= 0) { + LOG.info("checking for deactivate of application :" + + this.applicationId); + checkForDeactivation(); + } + + int lastRequestContainers = + (lastRequest != null) ? lastRequest.getNumContainers() : 0; + Resource lastRequestCapability = + lastRequest != null ? lastRequest.getCapability() : Resources.none(); + metrics.incrPendingResources(user, + request.getNumContainers(), request.getCapability()); + metrics.decrPendingResources(user, + lastRequestContainers, lastRequestCapability); + + // update queue: + Resource increasedResource = + Resources.multiply(request.getCapability(), request.getNumContainers()); + queue.incPendingResource(request.getNodeLabelExpression(), + increasedResource); + appResourceUsage.incPending(request.getNodeLabelExpression(), + increasedResource); + if (lastRequest != null) { + Resource decreasedResource = + Resources.multiply(lastRequestCapability, lastRequestContainers); + queue.decPendingResource(lastRequest.getNodeLabelExpression(), + decreasedResource); + appResourceUsage.decPending(lastRequest.getNodeLabelExpression(), + decreasedResource); + } + } + + private void updateNodeLabels(ResourceRequest request) { + Priority priority = request.getPriority(); + String resourceName = request.getResourceName(); + if (resourceName.equals(ResourceRequest.ANY)) { + ResourceRequest previousAnyRequest = + getResourceRequest(priority, resourceName); + + // When there is change in ANY request label expression, we should + // update label for all resource requests already added of same + // priority as ANY resource request. + if ((null == previousAnyRequest) + || hasRequestLabelChanged(previousAnyRequest, request)) { + Map resourceRequest = + getResourceRequests(priority); + if (resourceRequest != null) { + for (ResourceRequest r : resourceRequest.values()) { + if (!r.getResourceName().equals(ResourceRequest.ANY)) { + r.setNodeLabelExpression(request.getNodeLabelExpression()); + } + } + } + } + } else { + ResourceRequest anyRequest = + getResourceRequest(priority, ResourceRequest.ANY); + if (anyRequest != null) { + request.setNodeLabelExpression(anyRequest.getNodeLabelExpression()); + } + } + } + + private boolean hasRequestLabelChanged(ResourceRequest requestOne, ResourceRequest requestTwo) { String requestOneLabelExp = requestOne.getNodeLabelExpression(); String requestTwoLabelExp = requestTwo.getNodeLabelExpression(); @@ -465,24 +454,24 @@ public class AppSchedulingInfo { } } - synchronized public Collection getPriorities() { + public synchronized Collection getPriorities() { return priorities; } - synchronized public Map getResourceRequests( + public synchronized Map getResourceRequests( Priority priority) { return resourceRequestMap.get(priority); } - public List getAllResourceRequests() { - List ret = new ArrayList(); + public synchronized List getAllResourceRequests() { + List ret = new ArrayList<>(); for (Map r : resourceRequestMap.values()) { ret.addAll(r.values()); } return ret; } - synchronized public ResourceRequest getResourceRequest(Priority priority, + public synchronized ResourceRequest getResourceRequest(Priority priority, String resourceName) { Map nodeRequests = resourceRequestMap.get(priority); return (nodeRequests == null) ? null : nodeRequests.get(resourceName); @@ -511,7 +500,7 @@ public class AppSchedulingInfo { } } } - + public synchronized void increaseContainer( SchedContainerChangeRequest increaseRequest) { NodeId nodeId = increaseRequest.getNodeId(); @@ -559,28 +548,17 @@ public class AppSchedulingInfo { /** * Resources have been allocated to this application by the resource * scheduler. Track them. - * - * @param type - * the type of the node - * @param node - * the nodeinfo of the node - * @param priority - * the priority of the request. - * @param request - * the request - * @param container - * the containers allocated. */ - synchronized public List allocate(NodeType type, + public synchronized List allocate(NodeType type, SchedulerNode node, Priority priority, ResourceRequest request, - Container container) { - List resourceRequests = new ArrayList(); + Container containerAllocated) { + List resourceRequests = new ArrayList<>(); if (type == NodeType.NODE_LOCAL) { - allocateNodeLocal(node, priority, request, container, resourceRequests); + allocateNodeLocal(node, priority, request, resourceRequests); } else if (type == NodeType.RACK_LOCAL) { - allocateRackLocal(node, priority, request, container, resourceRequests); + allocateRackLocal(node, priority, request, resourceRequests); } else { - allocateOffSwitch(node, priority, request, container, resourceRequests); + allocateOffSwitch(request, resourceRequests); } QueueMetrics metrics = queue.getMetrics(); if (pending) { @@ -592,8 +570,8 @@ public class AppSchedulingInfo { if (LOG.isDebugEnabled()) { LOG.debug("allocate: applicationId=" + applicationId - + " container=" + container.getId() - + " host=" + container.getNodeId().toString() + + " container=" + containerAllocated.getId() + + " host=" + containerAllocated.getNodeId().toString() + " user=" + user + " resource=" + request.getCapability() + " type=" + type); @@ -606,12 +584,9 @@ public class AppSchedulingInfo { /** * The {@link ResourceScheduler} is allocating data-local resources to the * application. - * - * @param allocatedContainers - * resources allocated to the application */ - synchronized private void allocateNodeLocal(SchedulerNode node, - Priority priority, ResourceRequest nodeLocalRequest, Container container, + private synchronized void allocateNodeLocal(SchedulerNode node, + Priority priority, ResourceRequest nodeLocalRequest, List resourceRequests) { // Update future requirements decResourceRequest(node.getNodeName(), priority, nodeLocalRequest); @@ -641,12 +616,9 @@ public class AppSchedulingInfo { /** * The {@link ResourceScheduler} is allocating data-local resources to the * application. - * - * @param allocatedContainers - * resources allocated to the application */ - synchronized private void allocateRackLocal(SchedulerNode node, - Priority priority, ResourceRequest rackLocalRequest, Container container, + private synchronized void allocateRackLocal(SchedulerNode node, + Priority priority, ResourceRequest rackLocalRequest, List resourceRequests) { // Update future requirements decResourceRequest(node.getRackName(), priority, rackLocalRequest); @@ -663,20 +635,16 @@ public class AppSchedulingInfo { /** * The {@link ResourceScheduler} is allocating data-local resources to the * application. - * - * @param allocatedContainers - * resources allocated to the application */ - synchronized private void allocateOffSwitch(SchedulerNode node, - Priority priority, ResourceRequest offSwitchRequest, Container container, - List resourceRequests) { + private synchronized void allocateOffSwitch( + ResourceRequest offSwitchRequest, List resourceRequests) { // Update future requirements decrementOutstanding(offSwitchRequest); // Update cloned OffRack requests for recovery resourceRequests.add(cloneResourceRequest(offSwitchRequest)); } - synchronized private void decrementOutstanding( + private synchronized void decrementOutstanding( ResourceRequest offSwitchRequest) { int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1; @@ -695,7 +663,7 @@ public class AppSchedulingInfo { offSwitchRequest.getCapability()); } - synchronized private void checkForDeactivation() { + private synchronized void checkForDeactivation() { boolean deactivate = true; for (Priority priority : getPriorities()) { ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY); @@ -709,7 +677,7 @@ public class AppSchedulingInfo { // also we need to check increase request if (!deactivate) { - deactivate = increaseRequestMap.isEmpty(); + deactivate = containerIncreaseRequestMap.isEmpty(); } if (deactivate) { @@ -717,7 +685,7 @@ public class AppSchedulingInfo { } } - synchronized public void move(Queue newQueue) { + public synchronized void move(Queue newQueue) { QueueMetrics oldMetrics = queue.getMetrics(); QueueMetrics newMetrics = newQueue.getMetrics(); for (Map asks : resourceRequestMap.values()) { @@ -741,10 +709,9 @@ public class AppSchedulingInfo { activeUsersManager = newQueue.getActiveUsersManager(); activeUsersManager.activateApplication(user, applicationId); this.queue = newQueue; - this.queueName = newQueue.getQueueName(); } - synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) { + public synchronized void stop() { // clear pending resources metrics for the application QueueMetrics metrics = queue.getMetrics(); for (Map asks : resourceRequestMap.values()) { @@ -782,12 +749,8 @@ public class AppSchedulingInfo { public synchronized void transferStateFromPreviousAppSchedulingInfo( AppSchedulingInfo appInfo) { - // this.priorities = appInfo.getPriorities(); - // this.requests = appInfo.getRequests(); // This should not require locking the userBlacklist since it will not be // used by this instance until after setCurrentAppAttempt. - // Should cleanup this to avoid sharing between instances and can - // then remove getBlacklist as well. this.userBlacklist = appInfo.getBlackList(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/05fa852d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 09f3598..4d81350 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -331,7 +331,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { // Cleanup all scheduling information isStopped = true; - appSchedulingInfo.stop(rmAppAttemptFinalState); + appSchedulingInfo.stop(); } public synchronized boolean isStopped() {