hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjs...@apache.org
Subject [14/50] hadoop git commit: YARN-3762. FairScheduler: CME on FSParentQueue#getQueueUserAclInfo. (kasha)
Date Mon, 08 Jun 2015 17:19:12 GMT
YARN-3762. FairScheduler: CME on FSParentQueue#getQueueUserAclInfo. (kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b9e8f791
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b9e8f791
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b9e8f791

Branch: refs/heads/YARN-2928
Commit: b9e8f791333fcebd79e96b3ccb8f998572aecaa1
Parents: 95dd42b
Author: Karthik Kambatla <kasha@apache.org>
Authored: Wed Jun 3 13:47:24 2015 -0700
Committer: Zhijie Shen <zjshen@apache.org>
Committed: Mon Jun 8 09:43:14 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   2 +
 .../scheduler/fair/FSParentQueue.java           | 219 ++++++++++++++-----
 .../scheduler/fair/QueueManager.java            |   3 +-
 3 files changed, 164 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9e8f791/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 1841d80..fb9badc 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -573,6 +573,8 @@ Release 2.8.0 - UNRELEASED
     YARN-3751. Fixed AppInfo to check if used resources are null. (Sunil G via
     zjshen)
 
+    YARN-3762. FairScheduler: CME on FSParentQueue#getQueueUserAclInfo. (kasha)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9e8f791/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
index f74106a..7d2e5b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
@@ -23,6 +23,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,36 +47,64 @@ public class FSParentQueue extends FSQueue {
   private static final Log LOG = LogFactory.getLog(
       FSParentQueue.class.getName());
 
-  private final List<FSQueue> childQueues = 
-      new ArrayList<FSQueue>();
+  private final List<FSQueue> childQueues = new ArrayList<>();
   private Resource demand = Resources.createResource(0);
   private int runnableApps;
-  
+
+  private ReadWriteLock rwLock = new ReentrantReadWriteLock();
+  private Lock readLock = rwLock.readLock();
+  private Lock writeLock = rwLock.writeLock();
+
   public FSParentQueue(String name, FairScheduler scheduler,
       FSParentQueue parent) {
     super(name, scheduler, parent);
   }
   
   public void addChildQueue(FSQueue child) {
-    childQueues.add(child);
+    writeLock.lock();
+    try {
+      childQueues.add(child);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public void removeChildQueue(FSQueue child) {
+    writeLock.lock();
+    try {
+      childQueues.remove(child);
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   @Override
   public void recomputeShares() {
-    policy.computeShares(childQueues, getFairShare());
-    for (FSQueue childQueue : childQueues) {
-      childQueue.getMetrics().setFairShare(childQueue.getFairShare());
-      childQueue.recomputeShares();
+    readLock.lock();
+    try {
+      policy.computeShares(childQueues, getFairShare());
+      for (FSQueue childQueue : childQueues) {
+        childQueue.getMetrics().setFairShare(childQueue.getFairShare());
+        childQueue.recomputeShares();
+      }
+    } finally {
+      readLock.unlock();
     }
   }
 
   public void recomputeSteadyShares() {
-    policy.computeSteadyShares(childQueues, getSteadyFairShare());
-    for (FSQueue childQueue : childQueues) {
-      childQueue.getMetrics().setSteadyFairShare(childQueue.getSteadyFairShare());
-      if (childQueue instanceof FSParentQueue) {
-        ((FSParentQueue) childQueue).recomputeSteadyShares();
+    readLock.lock();
+    try {
+      policy.computeSteadyShares(childQueues, getSteadyFairShare());
+      for (FSQueue childQueue : childQueues) {
+        childQueue.getMetrics()
+            .setSteadyFairShare(childQueue.getSteadyFairShare());
+        if (childQueue instanceof FSParentQueue) {
+          ((FSParentQueue) childQueue).recomputeSteadyShares();
+        }
       }
+    } finally {
+      readLock.unlock();
     }
   }
 
@@ -81,21 +112,37 @@ public class FSParentQueue extends FSQueue {
   public void updatePreemptionVariables() {
     super.updatePreemptionVariables();
     // For child queues
-    for (FSQueue childQueue : childQueues) {
-      childQueue.updatePreemptionVariables();
+
+    readLock.lock();
+    try {
+      for (FSQueue childQueue : childQueues) {
+        childQueue.updatePreemptionVariables();
+      }
+    } finally {
+      readLock.unlock();
     }
   }
 
   @Override
   public Resource getDemand() {
-    return demand;
+    readLock.lock();
+    try {
+      return Resource.newInstance(demand.getMemory(), demand.getVirtualCores());
+    } finally {
+      readLock.unlock();
+    }
   }
 
   @Override
   public Resource getResourceUsage() {
     Resource usage = Resources.createResource(0);
-    for (FSQueue child : childQueues) {
-      Resources.addTo(usage, child.getResourceUsage());
+    readLock.lock();
+    try {
+      for (FSQueue child : childQueues) {
+        Resources.addTo(usage, child.getResourceUsage());
+      }
+    } finally {
+      readLock.unlock();
     }
     return usage;
   }
@@ -106,20 +153,25 @@ public class FSParentQueue extends FSQueue {
     // Limit demand to maxResources
     Resource maxRes = scheduler.getAllocationConfiguration()
         .getMaxResources(getName());
-    demand = Resources.createResource(0);
-    for (FSQueue childQueue : childQueues) {
-      childQueue.updateDemand();
-      Resource toAdd = childQueue.getDemand();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Counting resource from " + childQueue.getName() + " " + 
-            toAdd + "; Total resource consumption for " + getName() +
-            " now " + demand);
-      }
-      demand = Resources.add(demand, toAdd);
-      demand = Resources.componentwiseMin(demand, maxRes);
-      if (Resources.equals(demand, maxRes)) {
-        break;
+    writeLock.lock();
+    try {
+      demand = Resources.createResource(0);
+      for (FSQueue childQueue : childQueues) {
+        childQueue.updateDemand();
+        Resource toAdd = childQueue.getDemand();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Counting resource from " + childQueue.getName() + " " +
+              toAdd + "; Total resource consumption for " + getName() +
+              " now " + demand);
+        }
+        demand = Resources.add(demand, toAdd);
+        demand = Resources.componentwiseMin(demand, maxRes);
+        if (Resources.equals(demand, maxRes)) {
+          break;
+        }
       }
+    } finally {
+      writeLock.unlock();
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("The updated demand for " + getName() + " is " + demand +
@@ -127,33 +179,31 @@ public class FSParentQueue extends FSQueue {
     }    
   }
   
-  private synchronized QueueUserACLInfo getUserAclInfo(
-      UserGroupInformation user) {
-    QueueUserACLInfo userAclInfo = 
-      recordFactory.newRecordInstance(QueueUserACLInfo.class);
-    List<QueueACL> operations = new ArrayList<QueueACL>();
+  private QueueUserACLInfo getUserAclInfo(UserGroupInformation user) {
+    List<QueueACL> operations = new ArrayList<>();
     for (QueueACL operation : QueueACL.values()) {
       if (hasAccess(operation, user)) {
         operations.add(operation);
       } 
     }
-
-    userAclInfo.setQueueName(getQueueName());
-    userAclInfo.setUserAcls(operations);
-    return userAclInfo;
+    return QueueUserACLInfo.newInstance(getQueueName(), operations);
   }
   
   @Override
-  public synchronized List<QueueUserACLInfo> getQueueUserAclInfo(
-      UserGroupInformation user) {
+  public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
     List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
     
     // Add queue acls
     userAcls.add(getUserAclInfo(user));
     
     // Add children queue acls
-    for (FSQueue child : childQueues) {
-      userAcls.addAll(child.getQueueUserAclInfo(user));
+    readLock.lock();
+    try {
+      for (FSQueue child : childQueues) {
+        userAcls.addAll(child.getQueueUserAclInfo(user));
+      }
+    } finally {
+      readLock.unlock();
     }
  
     return userAcls;
@@ -168,12 +218,32 @@ public class FSParentQueue extends FSQueue {
       return assigned;
     }
 
-    Collections.sort(childQueues, policy.getComparator());
-    for (FSQueue child : childQueues) {
-      assigned = child.assignContainer(node);
-      if (!Resources.equals(assigned, Resources.none())) {
-        break;
+    // Hold the write lock when sorting childQueues
+    writeLock.lock();
+    try {
+      Collections.sort(childQueues, policy.getComparator());
+    } finally {
+      writeLock.unlock();
+    }
+
+    /*
+     * We are releasing the lock between the sort and iteration of the
+     * "sorted" list. There could be changes to the list here:
+     * 1. Add a child queue to the end of the list, this doesn't affect
+     * container assignment.
+     * 2. Remove a child queue, this is probably good to take care of so we
+     * don't assign to a queue that is going to be removed shortly.
+     */
+    readLock.lock();
+    try {
+      for (FSQueue child : childQueues) {
+        assigned = child.assignContainer(node);
+        if (!Resources.equals(assigned, Resources.none())) {
+          break;
+        }
       }
+    } finally {
+      readLock.unlock();
     }
     return assigned;
   }
@@ -185,11 +255,17 @@ public class FSParentQueue extends FSQueue {
     // Find the childQueue which is most over fair share
     FSQueue candidateQueue = null;
     Comparator<Schedulable> comparator = policy.getComparator();
-    for (FSQueue queue : childQueues) {
-      if (candidateQueue == null ||
-          comparator.compare(queue, candidateQueue) > 0) {
-        candidateQueue = queue;
+
+    readLock.lock();
+    try {
+      for (FSQueue queue : childQueues) {
+        if (candidateQueue == null ||
+            comparator.compare(queue, candidateQueue) > 0) {
+          candidateQueue = queue;
+        }
       }
+    } finally {
+      readLock.unlock();
     }
 
     // Let the selected queue choose which of its container to preempt
@@ -201,7 +277,12 @@ public class FSParentQueue extends FSQueue {
 
   @Override
   public List<FSQueue> getChildQueues() {
-    return childQueues;
+    readLock.lock();
+    try {
+      return Collections.unmodifiableList(childQueues);
+    } finally {
+      readLock.unlock();
+    }
   }
 
   @Override
@@ -218,23 +299,43 @@ public class FSParentQueue extends FSQueue {
   }
   
   public void incrementRunnableApps() {
-    runnableApps++;
+    writeLock.lock();
+    try {
+      runnableApps++;
+    } finally {
+      writeLock.unlock();
+    }
   }
   
   public void decrementRunnableApps() {
-    runnableApps--;
+    writeLock.lock();
+    try {
+      runnableApps--;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   @Override
   public int getNumRunnableApps() {
-    return runnableApps;
+    readLock.lock();
+    try {
+      return runnableApps;
+    } finally {
+      readLock.unlock();
+    }
   }
 
   @Override
   public void collectSchedulerApplications(
       Collection<ApplicationAttemptId> apps) {
-    for (FSQueue childQueue : childQueues) {
-      childQueue.collectSchedulerApplications(apps);
+    readLock.lock();
+    try {
+      for (FSQueue childQueue : childQueues) {
+        childQueue.collectSchedulerApplications(apps);
+      }
+    } finally {
+      readLock.unlock();
     }
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9e8f791/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
index 64442ab..6556717 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
@@ -304,7 +304,8 @@ public class QueueManager {
       }
     }
     queues.remove(queue.getName());
-    queue.getParent().getChildQueues().remove(queue);
+    FSParentQueue parent = queue.getParent();
+    parent.removeChildQueue(queue);
   }
   
   /**


Mime
View raw message