Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0622F172B9 for ; Wed, 3 Jun 2015 20:53:03 +0000 (UTC) Received: (qmail 44132 invoked by uid 500); 3 Jun 2015 20:53:02 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 44069 invoked by uid 500); 3 Jun 2015 20:53:02 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 44060 invoked by uid 99); 3 Jun 2015 20:53:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Jun 2015 20:53:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9529BE321C; Wed, 3 Jun 2015 20:53:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kasha@apache.org To: common-commits@hadoop.apache.org Message-Id: <286b33e59c844b3d964790e611bb674d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-3762. FairScheduler: CME on FSParentQueue#getQueueUserAclInfo. (kasha) Date: Wed, 3 Jun 2015 20:53:02 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/branch-2 5ecc647ae -> 62d51b889 YARN-3762. FairScheduler: CME on FSParentQueue#getQueueUserAclInfo. (kasha) (cherry picked from commit edb9cd0f7aa1ecaf34afaa120e3d79583e0ec689) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/62d51b88 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/62d51b88 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/62d51b88 Branch: refs/heads/branch-2 Commit: 62d51b889ac1cce53d6924e338a97ebcf8f6c51c Parents: 5ecc647 Author: Karthik Kambatla Authored: Wed Jun 3 13:47:24 2015 -0700 Committer: Karthik Kambatla Committed: Wed Jun 3 13:50:02 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 2 + .../scheduler/fair/FSParentQueue.java | 219 ++++++++++++++----- .../scheduler/fair/QueueManager.java | 3 +- 3 files changed, 164 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/62d51b88/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index d2fba36..ce2b6e2 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -434,6 +434,8 @@ Release 2.8.0 - UNRELEASED YARN-3751. Fixed AppInfo to check if used resources are null. (Sunil G via zjshen) + YARN-3762. FairScheduler: CME on FSParentQueue#getQueueUserAclInfo. (kasha) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/62d51b88/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index f74106a..7d2e5b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -23,6 +23,9 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,36 +47,64 @@ public class FSParentQueue extends FSQueue { private static final Log LOG = LogFactory.getLog( FSParentQueue.class.getName()); - private final List childQueues = - new ArrayList(); + private final List childQueues = new ArrayList<>(); private Resource demand = Resources.createResource(0); private int runnableApps; - + + private ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private Lock readLock = rwLock.readLock(); + private Lock writeLock = rwLock.writeLock(); + public FSParentQueue(String name, FairScheduler scheduler, FSParentQueue parent) { super(name, scheduler, parent); } public void addChildQueue(FSQueue child) { - childQueues.add(child); + writeLock.lock(); + try { + childQueues.add(child); + } finally { + writeLock.unlock(); + } + } + + public void removeChildQueue(FSQueue child) { + writeLock.lock(); + try { + childQueues.remove(child); + } finally { + writeLock.unlock(); + } } @Override public void recomputeShares() { - policy.computeShares(childQueues, getFairShare()); - for (FSQueue childQueue : childQueues) { - childQueue.getMetrics().setFairShare(childQueue.getFairShare()); - childQueue.recomputeShares(); + readLock.lock(); + try { + policy.computeShares(childQueues, getFairShare()); + for (FSQueue childQueue : childQueues) { + childQueue.getMetrics().setFairShare(childQueue.getFairShare()); + childQueue.recomputeShares(); + } + } finally { + readLock.unlock(); } } public void recomputeSteadyShares() { - policy.computeSteadyShares(childQueues, getSteadyFairShare()); - for (FSQueue childQueue : childQueues) { - childQueue.getMetrics().setSteadyFairShare(childQueue.getSteadyFairShare()); - if (childQueue instanceof FSParentQueue) { - ((FSParentQueue) childQueue).recomputeSteadyShares(); + readLock.lock(); + try { + policy.computeSteadyShares(childQueues, getSteadyFairShare()); + for (FSQueue childQueue : childQueues) { + childQueue.getMetrics() + .setSteadyFairShare(childQueue.getSteadyFairShare()); + if (childQueue instanceof FSParentQueue) { + ((FSParentQueue) childQueue).recomputeSteadyShares(); + } } + } finally { + readLock.unlock(); } } @@ -81,21 +112,37 @@ public class FSParentQueue extends FSQueue { public void updatePreemptionVariables() { super.updatePreemptionVariables(); // For child queues - for (FSQueue childQueue : childQueues) { - childQueue.updatePreemptionVariables(); + + readLock.lock(); + try { + for (FSQueue childQueue : childQueues) { + childQueue.updatePreemptionVariables(); + } + } finally { + readLock.unlock(); } } @Override public Resource getDemand() { - return demand; + readLock.lock(); + try { + return Resource.newInstance(demand.getMemory(), demand.getVirtualCores()); + } finally { + readLock.unlock(); + } } @Override public Resource getResourceUsage() { Resource usage = Resources.createResource(0); - for (FSQueue child : childQueues) { - Resources.addTo(usage, child.getResourceUsage()); + readLock.lock(); + try { + for (FSQueue child : childQueues) { + Resources.addTo(usage, child.getResourceUsage()); + } + } finally { + readLock.unlock(); } return usage; } @@ -106,20 +153,25 @@ public class FSParentQueue extends FSQueue { // Limit demand to maxResources Resource maxRes = scheduler.getAllocationConfiguration() .getMaxResources(getName()); - demand = Resources.createResource(0); - for (FSQueue childQueue : childQueues) { - childQueue.updateDemand(); - Resource toAdd = childQueue.getDemand(); - if (LOG.isDebugEnabled()) { - LOG.debug("Counting resource from " + childQueue.getName() + " " + - toAdd + "; Total resource consumption for " + getName() + - " now " + demand); - } - demand = Resources.add(demand, toAdd); - demand = Resources.componentwiseMin(demand, maxRes); - if (Resources.equals(demand, maxRes)) { - break; + writeLock.lock(); + try { + demand = Resources.createResource(0); + for (FSQueue childQueue : childQueues) { + childQueue.updateDemand(); + Resource toAdd = childQueue.getDemand(); + if (LOG.isDebugEnabled()) { + LOG.debug("Counting resource from " + childQueue.getName() + " " + + toAdd + "; Total resource consumption for " + getName() + + " now " + demand); + } + demand = Resources.add(demand, toAdd); + demand = Resources.componentwiseMin(demand, maxRes); + if (Resources.equals(demand, maxRes)) { + break; + } } + } finally { + writeLock.unlock(); } if (LOG.isDebugEnabled()) { LOG.debug("The updated demand for " + getName() + " is " + demand + @@ -127,33 +179,31 @@ public class FSParentQueue extends FSQueue { } } - private synchronized QueueUserACLInfo getUserAclInfo( - UserGroupInformation user) { - QueueUserACLInfo userAclInfo = - recordFactory.newRecordInstance(QueueUserACLInfo.class); - List operations = new ArrayList(); + private QueueUserACLInfo getUserAclInfo(UserGroupInformation user) { + List operations = new ArrayList<>(); for (QueueACL operation : QueueACL.values()) { if (hasAccess(operation, user)) { operations.add(operation); } } - - userAclInfo.setQueueName(getQueueName()); - userAclInfo.setUserAcls(operations); - return userAclInfo; + return QueueUserACLInfo.newInstance(getQueueName(), operations); } @Override - public synchronized List getQueueUserAclInfo( - UserGroupInformation user) { + public List getQueueUserAclInfo(UserGroupInformation user) { List userAcls = new ArrayList(); // Add queue acls userAcls.add(getUserAclInfo(user)); // Add children queue acls - for (FSQueue child : childQueues) { - userAcls.addAll(child.getQueueUserAclInfo(user)); + readLock.lock(); + try { + for (FSQueue child : childQueues) { + userAcls.addAll(child.getQueueUserAclInfo(user)); + } + } finally { + readLock.unlock(); } return userAcls; @@ -168,12 +218,32 @@ public class FSParentQueue extends FSQueue { return assigned; } - Collections.sort(childQueues, policy.getComparator()); - for (FSQueue child : childQueues) { - assigned = child.assignContainer(node); - if (!Resources.equals(assigned, Resources.none())) { - break; + // Hold the write lock when sorting childQueues + writeLock.lock(); + try { + Collections.sort(childQueues, policy.getComparator()); + } finally { + writeLock.unlock(); + } + + /* + * We are releasing the lock between the sort and iteration of the + * "sorted" list. There could be changes to the list here: + * 1. Add a child queue to the end of the list, this doesn't affect + * container assignment. + * 2. Remove a child queue, this is probably good to take care of so we + * don't assign to a queue that is going to be removed shortly. + */ + readLock.lock(); + try { + for (FSQueue child : childQueues) { + assigned = child.assignContainer(node); + if (!Resources.equals(assigned, Resources.none())) { + break; + } } + } finally { + readLock.unlock(); } return assigned; } @@ -185,11 +255,17 @@ public class FSParentQueue extends FSQueue { // Find the childQueue which is most over fair share FSQueue candidateQueue = null; Comparator comparator = policy.getComparator(); - for (FSQueue queue : childQueues) { - if (candidateQueue == null || - comparator.compare(queue, candidateQueue) > 0) { - candidateQueue = queue; + + readLock.lock(); + try { + for (FSQueue queue : childQueues) { + if (candidateQueue == null || + comparator.compare(queue, candidateQueue) > 0) { + candidateQueue = queue; + } } + } finally { + readLock.unlock(); } // Let the selected queue choose which of its container to preempt @@ -201,7 +277,12 @@ public class FSParentQueue extends FSQueue { @Override public List getChildQueues() { - return childQueues; + readLock.lock(); + try { + return Collections.unmodifiableList(childQueues); + } finally { + readLock.unlock(); + } } @Override @@ -218,23 +299,43 @@ public class FSParentQueue extends FSQueue { } public void incrementRunnableApps() { - runnableApps++; + writeLock.lock(); + try { + runnableApps++; + } finally { + writeLock.unlock(); + } } public void decrementRunnableApps() { - runnableApps--; + writeLock.lock(); + try { + runnableApps--; + } finally { + writeLock.unlock(); + } } @Override public int getNumRunnableApps() { - return runnableApps; + readLock.lock(); + try { + return runnableApps; + } finally { + readLock.unlock(); + } } @Override public void collectSchedulerApplications( Collection apps) { - for (FSQueue childQueue : childQueues) { - childQueue.collectSchedulerApplications(apps); + readLock.lock(); + try { + for (FSQueue childQueue : childQueues) { + childQueue.collectSchedulerApplications(apps); + } + } finally { + readLock.unlock(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/62d51b88/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 64442ab..6556717 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -304,7 +304,8 @@ public class QueueManager { } } queues.remove(queue.getName()); - queue.getParent().getChildQueues().remove(queue); + FSParentQueue parent = queue.getParent(); + parent.removeChildQueue(queue); } /**