Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3D5F5200B84 for ; Tue, 20 Sep 2016 09:03:52 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3BFAC160AD6; Tue, 20 Sep 2016 07:03:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 950FA160AA9 for ; Tue, 20 Sep 2016 09:03:49 +0200 (CEST) Received: (qmail 49760 invoked by uid 500); 20 Sep 2016 07:03:48 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 49743 invoked by uid 99); 20 Sep 2016 07:03:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Sep 2016 07:03:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2BF2FE09B3; Tue, 20 Sep 2016 07:03:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianhe@apache.org To: common-commits@hadoop.apache.org Date: Tue, 20 Sep 2016 07:03:48 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] hadoop git commit: YARN-3140. Improve locks in AbstractCSQueue/LeafQueue/ParentQueue. Contributed by Wangda Tan archived-at: Tue, 20 Sep 2016 07:03:52 -0000 Repository: hadoop Updated Branches: refs/heads/trunk e52d6e7a4 -> 2b66d9ec5 http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b66d9ec/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 3e9785f..ffb6892 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -107,68 +107,77 @@ public class ParentQueue extends AbstractCSQueue { ", fullname=" + getQueuePath()); } - synchronized void setupQueueConfigs(Resource clusterResource) + void setupQueueConfigs(Resource clusterResource) throws IOException { - super.setupQueueConfigs(clusterResource); - StringBuilder aclsString = new StringBuilder(); - for (Map.Entry e : acls.entrySet()) { - aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); - } + try { + writeLock.lock(); + super.setupQueueConfigs(clusterResource); + StringBuilder aclsString = new StringBuilder(); + for (Map.Entry e : acls.entrySet()) { + aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); + } - StringBuilder labelStrBuilder = new StringBuilder(); - if (accessibleLabels != null) { - for (String s : accessibleLabels) { - labelStrBuilder.append(s); - labelStrBuilder.append(","); + StringBuilder labelStrBuilder = new StringBuilder(); + if (accessibleLabels != null) { + for (String s : accessibleLabels) { + labelStrBuilder.append(s); + labelStrBuilder.append(","); + } } - } - LOG.info(queueName + - ", capacity=" + this.queueCapacities.getCapacity() + - ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() + - ", maxCapacity=" + this.queueCapacities.getMaximumCapacity() + - ", absoluteMaxCapacity=" + this.queueCapacities.getAbsoluteMaximumCapacity() + - ", state=" + state + - ", acls=" + aclsString + - ", labels=" + labelStrBuilder.toString() + "\n" + - ", reservationsContinueLooking=" + reservationsContinueLooking); + LOG.info(queueName + ", capacity=" + this.queueCapacities.getCapacity() + + ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() + + ", maxCapacity=" + this.queueCapacities.getMaximumCapacity() + + ", absoluteMaxCapacity=" + this.queueCapacities + .getAbsoluteMaximumCapacity() + ", state=" + state + ", acls=" + + aclsString + ", labels=" + labelStrBuilder.toString() + "\n" + + ", reservationsContinueLooking=" + reservationsContinueLooking); + } finally { + writeLock.unlock(); + } } private static float PRECISION = 0.0005f; // 0.05% precision - synchronized void setChildQueues(Collection childQueues) { - // Validate - float childCapacities = 0; - for (CSQueue queue : childQueues) { - childCapacities += queue.getCapacity(); - } - float delta = Math.abs(1.0f - childCapacities); // crude way to check - // allow capacities being set to 0, and enforce child 0 if parent is 0 - if (((queueCapacities.getCapacity() > 0) && (delta > PRECISION)) || - ((queueCapacities.getCapacity() == 0) && (childCapacities > 0))) { - throw new IllegalArgumentException("Illegal" + - " capacity of " + childCapacities + - " for children of queue " + queueName); - } - // check label capacities - for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { - float capacityByLabel = queueCapacities.getCapacity(nodeLabel); - // check children's labels - float sum = 0; + + void setChildQueues(Collection childQueues) { + try { + writeLock.lock(); + // Validate + float childCapacities = 0; for (CSQueue queue : childQueues) { - sum += queue.getQueueCapacities().getCapacity(nodeLabel); + childCapacities += queue.getCapacity(); } - if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION) - || (capacityByLabel == 0) && (sum > 0)) { - throw new IllegalArgumentException("Illegal" + " capacity of " - + sum + " for children of queue " + queueName - + " for label=" + nodeLabel); + float delta = Math.abs(1.0f - childCapacities); // crude way to check + // allow capacities being set to 0, and enforce child 0 if parent is 0 + if (((queueCapacities.getCapacity() > 0) && (delta > PRECISION)) || ( + (queueCapacities.getCapacity() == 0) && (childCapacities > 0))) { + throw new IllegalArgumentException( + "Illegal" + " capacity of " + childCapacities + + " for children of queue " + queueName); } - } - - this.childQueues.clear(); - this.childQueues.addAll(childQueues); - if (LOG.isDebugEnabled()) { - LOG.debug("setChildQueues: " + getChildQueuesToPrint()); + // check label capacities + for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { + float capacityByLabel = queueCapacities.getCapacity(nodeLabel); + // check children's labels + float sum = 0; + for (CSQueue queue : childQueues) { + sum += queue.getQueueCapacities().getCapacity(nodeLabel); + } + if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION) + || (capacityByLabel == 0) && (sum > 0)) { + throw new IllegalArgumentException( + "Illegal" + " capacity of " + sum + " for children of queue " + + queueName + " for label=" + nodeLabel); + } + } + + this.childQueues.clear(); + this.childQueues.addAll(childQueues); + if (LOG.isDebugEnabled()) { + LOG.debug("setChildQueues: " + getChildQueuesToPrint()); + } + } finally { + writeLock.unlock(); } } @@ -179,53 +188,70 @@ public class ParentQueue extends AbstractCSQueue { } @Override - public synchronized QueueInfo getQueueInfo( + public QueueInfo getQueueInfo( boolean includeChildQueues, boolean recursive) { - QueueInfo queueInfo = getQueueInfo(); - - List childQueuesInfo = new ArrayList(); - if (includeChildQueues) { - for (CSQueue child : childQueues) { - // Get queue information recursively? - childQueuesInfo.add( - child.getQueueInfo(recursive, recursive)); + try { + readLock.lock(); + QueueInfo queueInfo = getQueueInfo(); + + List childQueuesInfo = new ArrayList<>(); + if (includeChildQueues) { + for (CSQueue child : childQueues) { + // Get queue information recursively? + childQueuesInfo.add(child.getQueueInfo(recursive, recursive)); + } } + queueInfo.setChildQueues(childQueuesInfo); + + return queueInfo; + } finally { + readLock.unlock(); } - queueInfo.setChildQueues(childQueuesInfo); - - return queueInfo; + } - private synchronized QueueUserACLInfo getUserAclInfo( + private QueueUserACLInfo getUserAclInfo( UserGroupInformation user) { - QueueUserACLInfo userAclInfo = - recordFactory.newRecordInstance(QueueUserACLInfo.class); - List operations = new ArrayList(); - for (QueueACL operation : QueueACL.values()) { - if (hasAccess(operation, user)) { - operations.add(operation); - } + try { + readLock.lock(); + QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance( + QueueUserACLInfo.class); + List operations = new ArrayList(); + for (QueueACL operation : QueueACL.values()) { + if (hasAccess(operation, user)) { + operations.add(operation); + } + } + + userAclInfo.setQueueName(getQueueName()); + userAclInfo.setUserAcls(operations); + return userAclInfo; + } finally { + readLock.unlock(); } - userAclInfo.setQueueName(getQueueName()); - userAclInfo.setUserAcls(operations); - return userAclInfo; } @Override - public synchronized List getQueueUserAclInfo( + public List getQueueUserAclInfo( UserGroupInformation user) { - List userAcls = new ArrayList(); - - // Add parent queue acls - userAcls.add(getUserAclInfo(user)); - - // Add children queue acls - for (CSQueue child : childQueues) { - userAcls.addAll(child.getQueueUserAclInfo(user)); + try { + readLock.lock(); + List userAcls = new ArrayList<>(); + + // Add parent queue acls + userAcls.add(getUserAclInfo(user)); + + // Add children queue acls + for (CSQueue child : childQueues) { + userAcls.addAll(child.getQueueUserAclInfo(user)); + } + + return userAcls; + } finally { + readLock.unlock(); } - - return userAcls; + } public String toString() { @@ -240,52 +266,59 @@ public class ParentQueue extends AbstractCSQueue { } @Override - public synchronized void reinitialize(CSQueue newlyParsedQueue, + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { - // Sanity check - if (!(newlyParsedQueue instanceof ParentQueue) || - !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { - throw new IOException("Trying to reinitialize " + getQueuePath() + - " from " + newlyParsedQueue.getQueuePath()); - } + try { + writeLock.lock(); + // Sanity check + if (!(newlyParsedQueue instanceof ParentQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); + } - ParentQueue newlyParsedParentQueue = (ParentQueue)newlyParsedQueue; + ParentQueue newlyParsedParentQueue = (ParentQueue) newlyParsedQueue; - // Set new configs - setupQueueConfigs(clusterResource); + // Set new configs + setupQueueConfigs(clusterResource); - // Re-configure existing child queues and add new ones - // The CS has already checked to ensure all existing child queues are present! - Map currentChildQueues = getQueues(childQueues); - Map newChildQueues = - getQueues(newlyParsedParentQueue.childQueues); - for (Map.Entry e : newChildQueues.entrySet()) { - String newChildQueueName = e.getKey(); - CSQueue newChildQueue = e.getValue(); + // Re-configure existing child queues and add new ones + // The CS has already checked to ensure all existing child queues are present! + Map currentChildQueues = getQueues(childQueues); + Map newChildQueues = getQueues( + newlyParsedParentQueue.childQueues); + for (Map.Entry e : newChildQueues.entrySet()) { + String newChildQueueName = e.getKey(); + CSQueue newChildQueue = e.getValue(); - CSQueue childQueue = currentChildQueues.get(newChildQueueName); - - // Check if the child-queue already exists - if (childQueue != null) { - // Re-init existing child queues - childQueue.reinitialize(newChildQueue, clusterResource); - LOG.info(getQueueName() + ": re-configured queue: " + childQueue); - } else { - // New child queue, do not re-init - - // Set parent to 'this' - newChildQueue.setParent(this); - - // Save in list of current child queues - currentChildQueues.put(newChildQueueName, newChildQueue); - - LOG.info(getQueueName() + ": added new child queue: " + newChildQueue); + CSQueue childQueue = currentChildQueues.get(newChildQueueName); + + // Check if the child-queue already exists + if (childQueue != null) { + // Re-init existing child queues + childQueue.reinitialize(newChildQueue, clusterResource); + LOG.info(getQueueName() + ": re-configured queue: " + childQueue); + } else{ + // New child queue, do not re-init + + // Set parent to 'this' + newChildQueue.setParent(this); + + // Save in list of current child queues + currentChildQueues.put(newChildQueueName, newChildQueue); + + LOG.info( + getQueueName() + ": added new child queue: " + newChildQueue); + } } - } - // Re-sort all queues - childQueues.clear(); - childQueues.addAll(currentChildQueues.values()); + // Re-sort all queues + childQueues.clear(); + childQueues.addAll(currentChildQueues.values()); + } finally { + writeLock.unlock(); + } } Map getQueues(Set queues) { @@ -299,21 +332,24 @@ public class ParentQueue extends AbstractCSQueue { @Override public void submitApplication(ApplicationId applicationId, String user, String queue) throws AccessControlException { - - synchronized (this) { + + try { + writeLock.lock(); // Sanity check if (queue.equals(queueName)) { - throw new AccessControlException("Cannot submit application " + - "to non-leaf queue: " + queueName); + throw new AccessControlException( + "Cannot submit application " + "to non-leaf queue: " + queueName); } - + if (state != QueueState.RUNNING) { - throw new AccessControlException("Queue " + getQueuePath() + - " is STOPPED. Cannot accept submission of application: " + - applicationId); + throw new AccessControlException("Queue " + getQueuePath() + + " is STOPPED. Cannot accept submission of application: " + + applicationId); } addApplication(applicationId, user); + } finally { + writeLock.unlock(); } // Inform the parent queue @@ -342,24 +378,26 @@ public class ParentQueue extends AbstractCSQueue { // finish attempt logic. } - private synchronized void addApplication(ApplicationId applicationId, + private void addApplication(ApplicationId applicationId, String user) { - ++numApplications; + try { + writeLock.lock(); + ++numApplications; - LOG.info("Application added -" + - " appId: " + applicationId + - " user: " + user + - " leaf-queue of parent: " + getQueueName() + - " #applications: " + getNumApplications()); + LOG.info( + "Application added -" + " appId: " + applicationId + " user: " + user + + " leaf-queue of parent: " + getQueueName() + " #applications: " + + getNumApplications()); + } finally { + writeLock.unlock(); + } } @Override public void finishApplication(ApplicationId application, String user) { - - synchronized (this) { - removeApplication(application, user); - } + + removeApplication(application, user); // Inform the parent queue if (parent != null) { @@ -367,16 +405,18 @@ public class ParentQueue extends AbstractCSQueue { } } - private synchronized void removeApplication(ApplicationId applicationId, + private void removeApplication(ApplicationId applicationId, String user) { - - --numApplications; - - LOG.info("Application removed -" + - " appId: " + applicationId + - " user: " + user + - " leaf-queue of parent: " + getQueueName() + - " #applications: " + getNumApplications()); + try { + writeLock.lock(); + --numApplications; + + LOG.info("Application removed -" + " appId: " + applicationId + " user: " + + user + " leaf-queue of parent: " + getQueueName() + + " #applications: " + getNumApplications()); + } finally { + writeLock.unlock(); + } } private String getParentName() { @@ -384,183 +424,181 @@ public class ParentQueue extends AbstractCSQueue { } @Override - public synchronized CSAssignment assignContainers(Resource clusterResource, + public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits resourceLimits, SchedulingMode schedulingMode) { - // if our queue cannot access this node, just return - if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY - && !accessibleToPartition(node.getPartition())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip this queue=" + getQueuePath() - + ", because it is not able to access partition=" + node - .getPartition()); - } - - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParentName(), getQueueName(), ActivityState.REJECTED, - ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node + try { + writeLock.lock(); + // if our queue cannot access this node, just return + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY + && !accessibleToPartition(node.getPartition())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip this queue=" + getQueuePath() + + ", because it is not able to access partition=" + node .getPartition()); - if (rootQueue) { - ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, - node); - } + } - return CSAssignment.NULL_ASSIGNMENT; - } - - // Check if this queue need more resource, simply skip allocation if this - // queue doesn't need more resources. - if (!super.hasPendingResourceRequest(node.getPartition(), - clusterResource, schedulingMode)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip this queue=" + getQueuePath() - + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-partition=" + node.getPartition()); - } + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.REJECTED, + ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node + .getPartition()); + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); + } - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParentName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE); - if (rootQueue) { - ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, - node); + return CSAssignment.NULL_ASSIGNMENT; } - return CSAssignment.NULL_ASSIGNMENT; - } - - CSAssignment assignment = - new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - - while (canAssign(clusterResource, node)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to assign containers to child-queue of " - + getQueueName()); - } - - // Are we over maximum-capacity for this queue? - // This will also consider parent's limits and also continuous reservation - // looking - if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), - resourceLimits, Resources.createResource( - getMetrics().getReservedMB(), getMetrics() - .getReservedVirtualCores()), schedulingMode)) { + // Check if this queue need more resource, simply skip allocation if this + // queue doesn't need more resources. + if (!super.hasPendingResourceRequest(node.getPartition(), clusterResource, + schedulingMode)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip this queue=" + getQueuePath() + + ", because it doesn't need more resource, schedulingMode=" + + schedulingMode.name() + " node-partition=" + node + .getPartition()); + } ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, getParentName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); + ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE); if (rootQueue) { ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, node); } - break; + return CSAssignment.NULL_ASSIGNMENT; } - // Schedule - CSAssignment assignedToChild = - assignContainersToChildQueues(clusterResource, node, resourceLimits, - schedulingMode); - assignment.setType(assignedToChild.getType()); - - // Done if no child-queue assigned anything - if (Resources.greaterThan( - resourceCalculator, clusterResource, - assignedToChild.getResource(), Resources.none())) { + CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), + NodeType.NODE_LOCAL); - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParentName(), getQueueName(), ActivityState.ACCEPTED, - ActivityDiagnosticConstant.EMPTY); + while (canAssign(clusterResource, node)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to assign containers to child-queue of " + + getQueueName()); + } - if (node.getReservedContainer() == null) { - if (rootQueue) { - ActivitiesLogger.NODE.finishAllocatedNodeAllocation( - activitiesManager, node, - assignedToChild.getAssignmentInformation() - .getFirstAllocatedOrReservedContainerId(), - AllocationState.ALLOCATED); - } - } else { + // Are we over maximum-capacity for this queue? + // This will also consider parent's limits and also continuous reservation + // looking + if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), + resourceLimits, Resources + .createResource(getMetrics().getReservedMB(), + getMetrics().getReservedVirtualCores()), schedulingMode)) { + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); if (rootQueue) { - ActivitiesLogger.NODE.finishAllocatedNodeAllocation( - activitiesManager, node, - assignedToChild.getAssignmentInformation() - .getFirstAllocatedOrReservedContainerId(), - AllocationState.RESERVED); + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); } + + break; } - // Track resource utilization for the parent-queue - allocateResource(clusterResource, assignedToChild.getResource(), - node.getPartition(), assignedToChild.isIncreasedAllocation()); - - // Track resource utilization in this pass of the scheduler - Resources - .addTo(assignment.getResource(), assignedToChild.getResource()); - Resources.addTo(assignment.getAssignmentInformation().getAllocated(), - assignedToChild.getAssignmentInformation().getAllocated()); - Resources.addTo(assignment.getAssignmentInformation().getReserved(), - assignedToChild.getAssignmentInformation().getReserved()); - assignment.getAssignmentInformation().incrAllocations( - assignedToChild.getAssignmentInformation().getNumAllocations()); - assignment.getAssignmentInformation().incrReservations( - assignedToChild.getAssignmentInformation().getNumReservations()); - assignment - .getAssignmentInformation() - .getAllocationDetails() - .addAll( - assignedToChild.getAssignmentInformation().getAllocationDetails()); - assignment - .getAssignmentInformation() - .getReservationDetails() - .addAll( + // Schedule + CSAssignment assignedToChild = assignContainersToChildQueues( + clusterResource, node, resourceLimits, schedulingMode); + assignment.setType(assignedToChild.getType()); + + // Done if no child-queue assigned anything + if (Resources.greaterThan(resourceCalculator, clusterResource, + assignedToChild.getResource(), Resources.none())) { + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.ACCEPTED, + ActivityDiagnosticConstant.EMPTY); + + if (node.getReservedContainer() == null) { + if (rootQueue) { + ActivitiesLogger.NODE.finishAllocatedNodeAllocation( + activitiesManager, node, + assignedToChild.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId(), + AllocationState.ALLOCATED); + } + } else{ + if (rootQueue) { + ActivitiesLogger.NODE.finishAllocatedNodeAllocation( + activitiesManager, node, + assignedToChild.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId(), + AllocationState.RESERVED); + } + } + + // Track resource utilization for the parent-queue + allocateResource(clusterResource, assignedToChild.getResource(), + node.getPartition(), assignedToChild.isIncreasedAllocation()); + + // Track resource utilization in this pass of the scheduler + Resources.addTo(assignment.getResource(), + assignedToChild.getResource()); + Resources.addTo(assignment.getAssignmentInformation().getAllocated(), + assignedToChild.getAssignmentInformation().getAllocated()); + Resources.addTo(assignment.getAssignmentInformation().getReserved(), + assignedToChild.getAssignmentInformation().getReserved()); + assignment.getAssignmentInformation().incrAllocations( + assignedToChild.getAssignmentInformation().getNumAllocations()); + assignment.getAssignmentInformation().incrReservations( + assignedToChild.getAssignmentInformation().getNumReservations()); + assignment.getAssignmentInformation().getAllocationDetails().addAll( + assignedToChild.getAssignmentInformation() + .getAllocationDetails()); + assignment.getAssignmentInformation().getReservationDetails().addAll( assignedToChild.getAssignmentInformation() .getReservationDetails()); - assignment.setIncreasedAllocation(assignedToChild - .isIncreasedAllocation()); - - LOG.info("assignedContainer" + - " queue=" + getQueueName() + - " usedCapacity=" + getUsedCapacity() + - " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + queueUsage.getUsed() + - " cluster=" + clusterResource); - - } else { - assignment.setSkippedType(assignedToChild.getSkippedType()); + assignment.setIncreasedAllocation( + assignedToChild.isIncreasedAllocation()); - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParentName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.EMPTY); - if (rootQueue) { - ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, - node); - } + LOG.info("assignedContainer" + " queue=" + getQueueName() + + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + + " cluster=" + clusterResource); - break; - } + } else{ + assignment.setSkippedType(assignedToChild.getSkippedType()); - if (LOG.isDebugEnabled()) { - LOG.debug("ParentQ=" + getQueueName() - + " assignedSoFarInThisIteration=" + assignment.getResource() - + " usedCapacity=" + getUsedCapacity() - + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity()); - } + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); + } + + break; + } - // Do not assign more than one container if this isn't the root queue - // or if we've already assigned an off-switch container - if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) { if (LOG.isDebugEnabled()) { - if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) { - LOG.debug("Not assigning more than one off-switch container," + - " assignments so far: " + assignment); + LOG.debug( + "ParentQ=" + getQueueName() + " assignedSoFarInThisIteration=" + + assignment.getResource() + " usedCapacity=" + + getUsedCapacity() + " absoluteUsedCapacity=" + + getAbsoluteUsedCapacity()); + } + + // Do not assign more than one container if this isn't the root queue + // or if we've already assigned an off-switch container + if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) { + if (LOG.isDebugEnabled()) { + if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) { + LOG.debug("Not assigning more than one off-switch container," + + " assignments so far: " + assignment); + } } + break; } - break; } - } - - return assignment; + + return assignment; + } finally { + writeLock.unlock(); + } } private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { @@ -628,7 +666,7 @@ public class ParentQueue extends AbstractCSQueue { return childrenList.iterator(); } - private synchronized CSAssignment assignContainersToChildQueues( + private CSAssignment assignContainersToChildQueues( Resource cluster, FiCaSchedulerNode node, ResourceLimits limits, SchedulingMode schedulingMode) { CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT; @@ -717,39 +755,45 @@ public class ParentQueue extends AbstractCSQueue { } } - private synchronized void internalReleaseResource(Resource clusterResource, + private void internalReleaseResource(Resource clusterResource, FiCaSchedulerNode node, Resource releasedResource, boolean changeResource, CSQueue completedChildQueue, boolean sortQueues) { - super.releaseResource(clusterResource, - releasedResource, node.getPartition(), - changeResource); + try { + writeLock.lock(); + super.releaseResource(clusterResource, releasedResource, + node.getPartition(), changeResource); - if (LOG.isDebugEnabled()) { - LOG.debug("completedContainer " + this + ", cluster=" + clusterResource); - } + if (LOG.isDebugEnabled()) { + LOG.debug( + "completedContainer " + this + ", cluster=" + clusterResource); + } - // Note that this is using an iterator on the childQueues so this can't - // be called if already within an iterator for the childQueues. Like - // from assignContainersToChildQueues. - if (sortQueues) { - // reinsert the updated queue - for (Iterator iter = childQueues.iterator(); iter.hasNext();) { - CSQueue csqueue = iter.next(); - if (csqueue.equals(completedChildQueue)) { - iter.remove(); - if (LOG.isDebugEnabled()) { - LOG.debug("Re-sorting completed queue: " + csqueue); + // Note that this is using an iterator on the childQueues so this can't + // be called if already within an iterator for the childQueues. Like + // from assignContainersToChildQueues. + if (sortQueues) { + // reinsert the updated queue + for (Iterator iter = childQueues.iterator(); + iter.hasNext(); ) { + CSQueue csqueue = iter.next(); + if (csqueue.equals(completedChildQueue)) { + iter.remove(); + if (LOG.isDebugEnabled()) { + LOG.debug("Re-sorting completed queue: " + csqueue); + } + childQueues.add(csqueue); + break; } - childQueues.add(csqueue); - break; } } - } - // If we skipped sort queue this time, we need to resort queues to make - // sure we allocate from least usage (or order defined by queue policy) - // queues. - needToResortQueuesAtNextAllocation = !sortQueues; + // If we skipped sort queue this time, we need to resort queues to make + // sure we allocate from least usage (or order defined by queue policy) + // queues. + needToResortQueuesAtNextAllocation = !sortQueues; + } finally { + writeLock.unlock(); + } } @Override @@ -806,24 +850,35 @@ public class ParentQueue extends AbstractCSQueue { } @Override - public synchronized void updateClusterResource(Resource clusterResource, + public void updateClusterResource(Resource clusterResource, ResourceLimits resourceLimits) { - // Update all children - for (CSQueue childQueue : childQueues) { - // Get ResourceLimits of child queue before assign containers - ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, - clusterResource, resourceLimits.getLimit(), - RMNodeLabelsManager.NO_LABEL); - childQueue.updateClusterResource(clusterResource, childLimits); + try { + writeLock.lock(); + // Update all children + for (CSQueue childQueue : childQueues) { + // Get ResourceLimits of child queue before assign containers + ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, + clusterResource, resourceLimits.getLimit(), + RMNodeLabelsManager.NO_LABEL); + childQueue.updateClusterResource(clusterResource, childLimits); + } + + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + minimumAllocation, this, labelManager, null); + } finally { + writeLock.unlock(); } - - CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, - minimumAllocation, this, labelManager, null); } @Override - public synchronized List getChildQueues() { - return new ArrayList(childQueues); + public List getChildQueues() { + try { + readLock.lock(); + return new ArrayList(childQueues); + } finally { + readLock.unlock(); + } + } @Override @@ -832,13 +887,18 @@ public class ParentQueue extends AbstractCSQueue { if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { return; } - // Careful! Locking order is important! - synchronized (this) { - FiCaSchedulerNode node = - scheduler.getNode(rmContainer.getContainer().getNodeId()); + + // Careful! Locking order is important! + try { + writeLock.lock(); + FiCaSchedulerNode node = scheduler.getNode( + rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, rmContainer.getContainer().getResource(), node.getPartition(), false); + } finally { + writeLock.unlock(); } + if (parent != null) { parent.recoverContainer(clusterResource, attempt, rmContainer); } @@ -851,11 +911,17 @@ public class ParentQueue extends AbstractCSQueue { } @Override - public synchronized void collectSchedulerApplications( + public void collectSchedulerApplications( Collection apps) { - for (CSQueue queue : childQueues) { - queue.collectSchedulerApplications(apps); + try { + readLock.lock(); + for (CSQueue queue : childQueues) { + queue.collectSchedulerApplications(apps); + } + } finally { + readLock.unlock(); } + } @Override @@ -897,44 +963,49 @@ public class ParentQueue extends AbstractCSQueue { } } - public synchronized int getNumApplications() { + public int getNumApplications() { return numApplications; } - synchronized void allocateResource(Resource clusterResource, + void allocateResource(Resource clusterResource, Resource resource, String nodePartition, boolean changeContainerResource) { - super.allocateResource(clusterResource, resource, nodePartition, - changeContainerResource); - - /** - * check if we need to kill (killable) containers if maximum resource violated. - * Doing this because we will deduct killable resource when going from root. - * For example: - *
-     *      Root
-     *      /   \
-     *     a     b
-     *   /  \
-     *  a1  a2
-     * 
- * - * a: max=10G, used=10G, killable=2G - * a1: used=8G, killable=2G - * a2: used=2G, pending=2G, killable=0G - * - * When we get queue-a to allocate resource, even if queue-a - * reaches its max resource, we deduct its used by killable, so we can allocate - * at most 2G resources. ResourceLimits passed down to a2 has headroom set to 2G. - * - * If scheduler finds a 2G available resource in existing cluster, and assigns it - * to a2, now a2's used= 2G + 2G = 4G, and a's used = 8G + 4G = 12G > 10G - * - * When this happens, we have to preempt killable container (on same or different - * nodes) of parent queue to avoid violating parent's max resource. - */ - if (getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition) - < getQueueCapacities().getAbsoluteUsedCapacity(nodePartition)) { - killContainersToEnforceMaxQueueCapacity(nodePartition, clusterResource); + try { + writeLock.lock(); + super.allocateResource(clusterResource, resource, nodePartition, + changeContainerResource); + + /** + * check if we need to kill (killable) containers if maximum resource violated. + * Doing this because we will deduct killable resource when going from root. + * For example: + *
+       *      Root
+       *      /   \
+       *     a     b
+       *   /  \
+       *  a1  a2
+       * 
+ * + * a: max=10G, used=10G, killable=2G + * a1: used=8G, killable=2G + * a2: used=2G, pending=2G, killable=0G + * + * When we get queue-a to allocate resource, even if queue-a + * reaches its max resource, we deduct its used by killable, so we can allocate + * at most 2G resources. ResourceLimits passed down to a2 has headroom set to 2G. + * + * If scheduler finds a 2G available resource in existing cluster, and assigns it + * to a2, now a2's used= 2G + 2G = 4G, and a's used = 8G + 4G = 12G > 10G + * + * When this happens, we have to preempt killable container (on same or different + * nodes) of parent queue to avoid violating parent's max resource. + */ + if (getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition) + < getQueueCapacities().getAbsoluteUsedCapacity(nodePartition)) { + killContainersToEnforceMaxQueueCapacity(nodePartition, clusterResource); + } + } finally { + writeLock.unlock(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b66d9ec/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index 7b53ad5..a391f25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -79,76 +79,98 @@ public class PlanQueue extends ParentQueue { } @Override - public synchronized void reinitialize(CSQueue newlyParsedQueue, + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { - // Sanity check - if (!(newlyParsedQueue instanceof PlanQueue) - || !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { - throw new IOException("Trying to reinitialize " + getQueuePath() - + " from " + newlyParsedQueue.getQueuePath()); - } + try { + writeLock.lock(); + // Sanity check + if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); + } - PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue; + PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue; - if (newlyParsedParentQueue.getChildQueues().size() > 0) { - throw new IOException( - "Reservable Queue should not have sub-queues in the" - + "configuration"); - } + if (newlyParsedParentQueue.getChildQueues().size() > 0) { + throw new IOException( + "Reservable Queue should not have sub-queues in the" + + "configuration"); + } - // Set new configs - setupQueueConfigs(clusterResource); + // Set new configs + setupQueueConfigs(clusterResource); - updateQuotas(newlyParsedParentQueue.userLimit, - newlyParsedParentQueue.userLimitFactor, - newlyParsedParentQueue.maxAppsForReservation, - newlyParsedParentQueue.maxAppsPerUserForReservation); + updateQuotas(newlyParsedParentQueue.userLimit, + newlyParsedParentQueue.userLimitFactor, + newlyParsedParentQueue.maxAppsForReservation, + newlyParsedParentQueue.maxAppsPerUserForReservation); - // run reinitialize on each existing queue, to trigger absolute cap - // recomputations - for (CSQueue res : this.getChildQueues()) { - res.reinitialize(res, clusterResource); + // run reinitialize on each existing queue, to trigger absolute cap + // recomputations + for (CSQueue res : this.getChildQueues()) { + res.reinitialize(res, clusterResource); + } + showReservationsAsQueues = + newlyParsedParentQueue.showReservationsAsQueues; + } finally { + writeLock.unlock(); } - showReservationsAsQueues = newlyParsedParentQueue.showReservationsAsQueues; } - synchronized void addChildQueue(CSQueue newQueue) + void addChildQueue(CSQueue newQueue) throws SchedulerDynamicEditException { - if (newQueue.getCapacity() > 0) { - throw new SchedulerDynamicEditException("Queue " + newQueue - + " being added has non zero capacity."); - } - boolean added = this.childQueues.add(newQueue); - if (LOG.isDebugEnabled()) { - LOG.debug("updateChildQueues (action: add queue): " + added + " " - + getChildQueuesToPrint()); + try { + writeLock.lock(); + if (newQueue.getCapacity() > 0) { + throw new SchedulerDynamicEditException( + "Queue " + newQueue + " being added has non zero capacity."); + } + boolean added = this.childQueues.add(newQueue); + if (LOG.isDebugEnabled()) { + LOG.debug("updateChildQueues (action: add queue): " + added + " " + + getChildQueuesToPrint()); + } + } finally { + writeLock.unlock(); } } - synchronized void removeChildQueue(CSQueue remQueue) + void removeChildQueue(CSQueue remQueue) throws SchedulerDynamicEditException { - if (remQueue.getCapacity() > 0) { - throw new SchedulerDynamicEditException("Queue " + remQueue - + " being removed has non zero capacity."); - } - Iterator qiter = childQueues.iterator(); - while (qiter.hasNext()) { - CSQueue cs = qiter.next(); - if (cs.equals(remQueue)) { - qiter.remove(); - if (LOG.isDebugEnabled()) { - LOG.debug("Removed child queue: {}", cs.getQueueName()); + try { + writeLock.lock(); + if (remQueue.getCapacity() > 0) { + throw new SchedulerDynamicEditException( + "Queue " + remQueue + " being removed has non zero capacity."); + } + Iterator qiter = childQueues.iterator(); + while (qiter.hasNext()) { + CSQueue cs = qiter.next(); + if (cs.equals(remQueue)) { + qiter.remove(); + if (LOG.isDebugEnabled()) { + LOG.debug("Removed child queue: {}", cs.getQueueName()); + } } } + } finally { + writeLock.unlock(); } } - protected synchronized float sumOfChildCapacities() { - float ret = 0; - for (CSQueue l : childQueues) { - ret += l.getCapacity(); + protected float sumOfChildCapacities() { + try { + writeLock.lock(); + float ret = 0; + for (CSQueue l : childQueues) { + ret += l.getCapacity(); + } + return ret; + } finally { + writeLock.unlock(); } - return ret; } private void updateQuotas(int userLimit, float userLimitFactor, http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b66d9ec/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java index 976cf8c..faeb37e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -51,22 +51,28 @@ public class ReservationQueue extends LeafQueue { } @Override - public synchronized void reinitialize(CSQueue newlyParsedQueue, + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { - // Sanity check - if (!(newlyParsedQueue instanceof ReservationQueue) - || !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { - throw new IOException("Trying to reinitialize " + getQueuePath() - + " from " + newlyParsedQueue.getQueuePath()); - } - super.reinitialize(newlyParsedQueue, clusterResource); - CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, - minimumAllocation, this, labelManager, null); + try { + writeLock.lock(); + // Sanity check + if (!(newlyParsedQueue instanceof ReservationQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); + } + super.reinitialize(newlyParsedQueue, clusterResource); + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + minimumAllocation, this, labelManager, null); - updateQuotas(parent.getUserLimitForReservation(), - parent.getUserLimitFactor(), - parent.getMaxApplicationsForReservations(), - parent.getMaxApplicationsPerUserForReservation()); + updateQuotas(parent.getUserLimitForReservation(), + parent.getUserLimitFactor(), + parent.getMaxApplicationsForReservations(), + parent.getMaxApplicationsPerUserForReservation()); + } finally { + writeLock.unlock(); + } } /** @@ -77,21 +83,26 @@ public class ReservationQueue extends LeafQueue { * maxCapacity, etc..) * @throws SchedulerDynamicEditException */ - public synchronized void setEntitlement(QueueEntitlement entitlement) + public void setEntitlement(QueueEntitlement entitlement) throws SchedulerDynamicEditException { - float capacity = entitlement.getCapacity(); - if (capacity < 0 || capacity > 1.0f) { - throw new SchedulerDynamicEditException( - "Capacity demand is not in the [0,1] range: " + capacity); - } - setCapacity(capacity); - setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity()); - // note: we currently set maxCapacity to capacity - // this might be revised later - setMaxCapacity(entitlement.getMaxCapacity()); - if (LOG.isDebugEnabled()) { - LOG.debug("successfully changed to " + capacity + " for queue " - + this.getQueueName()); + try { + writeLock.lock(); + float capacity = entitlement.getCapacity(); + if (capacity < 0 || capacity > 1.0f) { + throw new SchedulerDynamicEditException( + "Capacity demand is not in the [0,1] range: " + capacity); + } + setCapacity(capacity); + setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity()); + // note: we currently set maxCapacity to capacity + // this might be revised later + setMaxCapacity(entitlement.getMaxCapacity()); + if (LOG.isDebugEnabled()) { + LOG.debug("successfully changed to " + capacity + " for queue " + this + .getQueueName()); + } + } finally { + writeLock.unlock(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b66d9ec/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index 6fba22a..2614630 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -828,8 +828,8 @@ public class TestContainerResizing { app.getAppAttemptResourceUsage().getPending().getMemorySize()); // Queue/user/application's usage will be updated checkUsedResource(rm1, "default", 0 * GB, null); - Assert.assertEquals(0 * GB, ((LeafQueue) cs.getQueue("default")) - .getUser("user").getUsed().getMemorySize()); + // User will be removed + Assert.assertNull(((LeafQueue) cs.getQueue("default")).getUser("user")); Assert.assertEquals(0 * GB, app.getAppAttemptResourceUsage().getReserved().getMemorySize()); Assert.assertEquals(0 * GB, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org