Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E90E918FDB for ; Mon, 8 Jun 2015 17:19:08 +0000 (UTC) Received: (qmail 60916 invoked by uid 500); 8 Jun 2015 17:19:00 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 60715 invoked by uid 500); 8 Jun 2015 17:19:00 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 59935 invoked by uid 99); 8 Jun 2015 17:19:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Jun 2015 17:19:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 453EFE0055; Mon, 8 Jun 2015 17:19:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zjshen@apache.org To: common-commits@hadoop.apache.org Date: Mon, 08 Jun 2015 17:19:12 -0000 Message-Id: <2bc62272211f46faa4953e2ca6315b38@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [14/50] hadoop git commit: YARN-3762. FairScheduler: CME on FSParentQueue#getQueueUserAclInfo. (kasha) YARN-3762. FairScheduler: CME on FSParentQueue#getQueueUserAclInfo. (kasha) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b9e8f791 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b9e8f791 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b9e8f791 Branch: refs/heads/YARN-2928 Commit: b9e8f791333fcebd79e96b3ccb8f998572aecaa1 Parents: 95dd42b Author: Karthik Kambatla Authored: Wed Jun 3 13:47:24 2015 -0700 Committer: Zhijie Shen Committed: Mon Jun 8 09:43:14 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 2 + .../scheduler/fair/FSParentQueue.java | 219 ++++++++++++++----- .../scheduler/fair/QueueManager.java | 3 +- 3 files changed, 164 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9e8f791/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 1841d80..fb9badc 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -573,6 +573,8 @@ Release 2.8.0 - UNRELEASED YARN-3751. Fixed AppInfo to check if used resources are null. (Sunil G via zjshen) + YARN-3762. FairScheduler: CME on FSParentQueue#getQueueUserAclInfo. (kasha) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9e8f791/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index f74106a..7d2e5b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -23,6 +23,9 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,36 +47,64 @@ public class FSParentQueue extends FSQueue { private static final Log LOG = LogFactory.getLog( FSParentQueue.class.getName()); - private final List childQueues = - new ArrayList(); + private final List childQueues = new ArrayList<>(); private Resource demand = Resources.createResource(0); private int runnableApps; - + + private ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private Lock readLock = rwLock.readLock(); + private Lock writeLock = rwLock.writeLock(); + public FSParentQueue(String name, FairScheduler scheduler, FSParentQueue parent) { super(name, scheduler, parent); } public void addChildQueue(FSQueue child) { - childQueues.add(child); + writeLock.lock(); + try { + childQueues.add(child); + } finally { + writeLock.unlock(); + } + } + + public void removeChildQueue(FSQueue child) { + writeLock.lock(); + try { + childQueues.remove(child); + } finally { + writeLock.unlock(); + } } @Override public void recomputeShares() { - policy.computeShares(childQueues, getFairShare()); - for (FSQueue childQueue : childQueues) { - childQueue.getMetrics().setFairShare(childQueue.getFairShare()); - childQueue.recomputeShares(); + readLock.lock(); + try { + policy.computeShares(childQueues, getFairShare()); + for (FSQueue childQueue : childQueues) { + childQueue.getMetrics().setFairShare(childQueue.getFairShare()); + childQueue.recomputeShares(); + } + } finally { + readLock.unlock(); } } public void recomputeSteadyShares() { - policy.computeSteadyShares(childQueues, getSteadyFairShare()); - for (FSQueue childQueue : childQueues) { - childQueue.getMetrics().setSteadyFairShare(childQueue.getSteadyFairShare()); - if (childQueue instanceof FSParentQueue) { - ((FSParentQueue) childQueue).recomputeSteadyShares(); + readLock.lock(); + try { + policy.computeSteadyShares(childQueues, getSteadyFairShare()); + for (FSQueue childQueue : childQueues) { + childQueue.getMetrics() + .setSteadyFairShare(childQueue.getSteadyFairShare()); + if (childQueue instanceof FSParentQueue) { + ((FSParentQueue) childQueue).recomputeSteadyShares(); + } } + } finally { + readLock.unlock(); } } @@ -81,21 +112,37 @@ public class FSParentQueue extends FSQueue { public void updatePreemptionVariables() { super.updatePreemptionVariables(); // For child queues - for (FSQueue childQueue : childQueues) { - childQueue.updatePreemptionVariables(); + + readLock.lock(); + try { + for (FSQueue childQueue : childQueues) { + childQueue.updatePreemptionVariables(); + } + } finally { + readLock.unlock(); } } @Override public Resource getDemand() { - return demand; + readLock.lock(); + try { + return Resource.newInstance(demand.getMemory(), demand.getVirtualCores()); + } finally { + readLock.unlock(); + } } @Override public Resource getResourceUsage() { Resource usage = Resources.createResource(0); - for (FSQueue child : childQueues) { - Resources.addTo(usage, child.getResourceUsage()); + readLock.lock(); + try { + for (FSQueue child : childQueues) { + Resources.addTo(usage, child.getResourceUsage()); + } + } finally { + readLock.unlock(); } return usage; } @@ -106,20 +153,25 @@ public class FSParentQueue extends FSQueue { // Limit demand to maxResources Resource maxRes = scheduler.getAllocationConfiguration() .getMaxResources(getName()); - demand = Resources.createResource(0); - for (FSQueue childQueue : childQueues) { - childQueue.updateDemand(); - Resource toAdd = childQueue.getDemand(); - if (LOG.isDebugEnabled()) { - LOG.debug("Counting resource from " + childQueue.getName() + " " + - toAdd + "; Total resource consumption for " + getName() + - " now " + demand); - } - demand = Resources.add(demand, toAdd); - demand = Resources.componentwiseMin(demand, maxRes); - if (Resources.equals(demand, maxRes)) { - break; + writeLock.lock(); + try { + demand = Resources.createResource(0); + for (FSQueue childQueue : childQueues) { + childQueue.updateDemand(); + Resource toAdd = childQueue.getDemand(); + if (LOG.isDebugEnabled()) { + LOG.debug("Counting resource from " + childQueue.getName() + " " + + toAdd + "; Total resource consumption for " + getName() + + " now " + demand); + } + demand = Resources.add(demand, toAdd); + demand = Resources.componentwiseMin(demand, maxRes); + if (Resources.equals(demand, maxRes)) { + break; + } } + } finally { + writeLock.unlock(); } if (LOG.isDebugEnabled()) { LOG.debug("The updated demand for " + getName() + " is " + demand + @@ -127,33 +179,31 @@ public class FSParentQueue extends FSQueue { } } - private synchronized QueueUserACLInfo getUserAclInfo( - UserGroupInformation user) { - QueueUserACLInfo userAclInfo = - recordFactory.newRecordInstance(QueueUserACLInfo.class); - List operations = new ArrayList(); + private QueueUserACLInfo getUserAclInfo(UserGroupInformation user) { + List operations = new ArrayList<>(); for (QueueACL operation : QueueACL.values()) { if (hasAccess(operation, user)) { operations.add(operation); } } - - userAclInfo.setQueueName(getQueueName()); - userAclInfo.setUserAcls(operations); - return userAclInfo; + return QueueUserACLInfo.newInstance(getQueueName(), operations); } @Override - public synchronized List getQueueUserAclInfo( - UserGroupInformation user) { + public List getQueueUserAclInfo(UserGroupInformation user) { List userAcls = new ArrayList(); // Add queue acls userAcls.add(getUserAclInfo(user)); // Add children queue acls - for (FSQueue child : childQueues) { - userAcls.addAll(child.getQueueUserAclInfo(user)); + readLock.lock(); + try { + for (FSQueue child : childQueues) { + userAcls.addAll(child.getQueueUserAclInfo(user)); + } + } finally { + readLock.unlock(); } return userAcls; @@ -168,12 +218,32 @@ public class FSParentQueue extends FSQueue { return assigned; } - Collections.sort(childQueues, policy.getComparator()); - for (FSQueue child : childQueues) { - assigned = child.assignContainer(node); - if (!Resources.equals(assigned, Resources.none())) { - break; + // Hold the write lock when sorting childQueues + writeLock.lock(); + try { + Collections.sort(childQueues, policy.getComparator()); + } finally { + writeLock.unlock(); + } + + /* + * We are releasing the lock between the sort and iteration of the + * "sorted" list. There could be changes to the list here: + * 1. Add a child queue to the end of the list, this doesn't affect + * container assignment. + * 2. Remove a child queue, this is probably good to take care of so we + * don't assign to a queue that is going to be removed shortly. + */ + readLock.lock(); + try { + for (FSQueue child : childQueues) { + assigned = child.assignContainer(node); + if (!Resources.equals(assigned, Resources.none())) { + break; + } } + } finally { + readLock.unlock(); } return assigned; } @@ -185,11 +255,17 @@ public class FSParentQueue extends FSQueue { // Find the childQueue which is most over fair share FSQueue candidateQueue = null; Comparator comparator = policy.getComparator(); - for (FSQueue queue : childQueues) { - if (candidateQueue == null || - comparator.compare(queue, candidateQueue) > 0) { - candidateQueue = queue; + + readLock.lock(); + try { + for (FSQueue queue : childQueues) { + if (candidateQueue == null || + comparator.compare(queue, candidateQueue) > 0) { + candidateQueue = queue; + } } + } finally { + readLock.unlock(); } // Let the selected queue choose which of its container to preempt @@ -201,7 +277,12 @@ public class FSParentQueue extends FSQueue { @Override public List getChildQueues() { - return childQueues; + readLock.lock(); + try { + return Collections.unmodifiableList(childQueues); + } finally { + readLock.unlock(); + } } @Override @@ -218,23 +299,43 @@ public class FSParentQueue extends FSQueue { } public void incrementRunnableApps() { - runnableApps++; + writeLock.lock(); + try { + runnableApps++; + } finally { + writeLock.unlock(); + } } public void decrementRunnableApps() { - runnableApps--; + writeLock.lock(); + try { + runnableApps--; + } finally { + writeLock.unlock(); + } } @Override public int getNumRunnableApps() { - return runnableApps; + readLock.lock(); + try { + return runnableApps; + } finally { + readLock.unlock(); + } } @Override public void collectSchedulerApplications( Collection apps) { - for (FSQueue childQueue : childQueues) { - childQueue.collectSchedulerApplications(apps); + readLock.lock(); + try { + for (FSQueue childQueue : childQueues) { + childQueue.collectSchedulerApplications(apps); + } + } finally { + readLock.unlock(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9e8f791/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 64442ab..6556717 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -304,7 +304,8 @@ public class QueueManager { } } queues.remove(queue.getName()); - queue.getParent().getChildQueues().remove(queue); + FSParentQueue parent = queue.getParent(); + parent.removeChildQueue(queue); } /**