hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tgra...@apache.org
Subject svn commit: r1401669 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ hadoop-yarn/hadoop-yarn...
Date Wed, 24 Oct 2012 13:25:19 GMT
Author: tgraves
Date: Wed Oct 24 13:25:19 2012
New Revision: 1401669

URL: http://svn.apache.org/viewvc?rev=1401669&view=rev
Log:
merge -r 1401667:1401668 from trunk. FIXES: YARN-177

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1401669&r1=1401668&r2=1401669&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Oct 24 13:25:19 2012
@@ -157,6 +157,9 @@ Release 0.23.5 - UNRELEASED
     YARN-174. Modify NodeManager to pass the user's configuration even when
     rebooting. (vinodkv)
 
+    YARN-177. CapacityScheduler - adding a queue while the RM is running has 
+    wacky results (acmurthy vai tgraves)
+
 Release 0.23.4 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1401669&r1=1401668&r2=1401669&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
Wed Oct 24 13:25:19 2012
@@ -51,6 +51,12 @@ extends org.apache.hadoop.yarn.server.re
   public CSQueue getParent();
 
   /**
+   * Set the parent <code>Queue</code>.
+   * @param newParentQueue new parent queue
+   */
+  public void setParent(CSQueue newParentQueue);
+
+  /**
    * Get the queue name.
    * @return the queue name
    */
@@ -195,10 +201,10 @@ extends org.apache.hadoop.yarn.server.re
   
   /**
    * Reinitialize the queue.
-   * @param queue new queue to re-initalize from
+   * @param newlyParsedQueue new queue to re-initalize from
    * @param clusterResource resources in the cluster
    */
-  public void reinitialize(CSQueue queue, Resource clusterResource) 
+  public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) 
   throws IOException;
 
    /**

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1401669&r1=1401668&r2=1401669&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
Wed Oct 24 13:25:19 2012
@@ -223,7 +223,7 @@ public class LeafQueue implements CSQueu
   {
     // Sanity check
     CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
-    float absCapacity = parent.getAbsoluteCapacity() * capacity;
+    float absCapacity = getParent().getAbsoluteCapacity() * capacity;
     CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absCapacity, absoluteMaxCapacity);
 
     this.capacity = capacity; 
@@ -256,7 +256,7 @@ public class LeafQueue implements CSQueu
     
     // Update metrics
     CSQueueUtils.updateQueueStatistics(
-        this, parent, clusterResource, minimumAllocation);
+        this, getParent(), clusterResource, minimumAllocation);
 
     LOG.info("Initializing " + queueName + "\n" +
         "capacity = " + capacity +
@@ -339,10 +339,15 @@ public class LeafQueue implements CSQueu
   }
 
   @Override
-  public CSQueue getParent() {
+  public synchronized CSQueue getParent() {
     return parent;
   }
-
+  
+  @Override
+  public synchronized void setParent(CSQueue newParentQueue) {
+    this.parent = (ParentQueue)newParentQueue;
+  }
+  
   @Override
   public String getQueueName() {
     return queueName;
@@ -350,7 +355,7 @@ public class LeafQueue implements CSQueu
 
   @Override
   public String getQueuePath() {
-    return parent.getQueuePath() + "." + getQueueName();
+    return getParent().getQueuePath() + "." + getQueueName();
   }
 
   /**
@@ -430,7 +435,9 @@ public class LeafQueue implements CSQueu
   synchronized void setMaxCapacity(float maximumCapacity) {
     // Sanity check
     CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
-    float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
+    float absMaxCapacity = 
+        CSQueueUtils.computeAbsoluteMaximumCapacity(
+            maximumCapacity, getParent());
     CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity);
     
     this.maximumCapacity = maximumCapacity;
@@ -453,10 +460,6 @@ public class LeafQueue implements CSQueu
     this.userLimitFactor = userLimitFactor;
   }
 
-  synchronized void setParentQueue(CSQueue parent) {
-    this.parent = parent;
-  }
-  
   @Override
   public synchronized int getNumApplications() {
     return getNumPendingApplications() + getNumActiveApplications();
@@ -559,26 +562,28 @@ public class LeafQueue implements CSQueu
   }
 
   @Override
-  public synchronized void reinitialize(CSQueue queue, Resource clusterResource) 
+  public synchronized void reinitialize(
+      CSQueue newlyParsedQueue, Resource clusterResource) 
   throws IOException {
     // Sanity check
-    if (!(queue instanceof LeafQueue) || 
-        !queue.getQueuePath().equals(getQueuePath())) {
+    if (!(newlyParsedQueue instanceof LeafQueue) || 
+        !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
       throw new IOException("Trying to reinitialize " + getQueuePath() + 
-          " from " + queue.getQueuePath());
+          " from " + newlyParsedQueue.getQueuePath());
     }
 
-    LeafQueue leafQueue = (LeafQueue)queue;
+    LeafQueue newlyParsedLeafQueue = (LeafQueue)newlyParsedQueue;
     setupQueueConfigs(
         clusterResource,
-        leafQueue.capacity, leafQueue.absoluteCapacity, 
-        leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity, 
-        leafQueue.userLimit, leafQueue.userLimitFactor, 
-        leafQueue.maxApplications,
-        leafQueue.getMaxApplicationsPerUser(),
-        leafQueue.getMaximumActiveApplications(), 
-        leafQueue.getMaximumActiveApplicationsPerUser(),
-        leafQueue.state, leafQueue.acls);
+        newlyParsedLeafQueue.capacity, newlyParsedLeafQueue.absoluteCapacity, 
+        newlyParsedLeafQueue.maximumCapacity, 
+        newlyParsedLeafQueue.absoluteMaxCapacity, 
+        newlyParsedLeafQueue.userLimit, newlyParsedLeafQueue.userLimitFactor, 
+        newlyParsedLeafQueue.maxApplications,
+        newlyParsedLeafQueue.getMaxApplicationsPerUser(),
+        newlyParsedLeafQueue.getMaximumActiveApplications(), 
+        newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
+        newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls);
   }
 
   @Override
@@ -591,7 +596,7 @@ public class LeafQueue implements CSQueu
     }
 
     // Check if parent-queue allows access
-    return parent.hasAccess(acl, user);
+    return getParent().hasAccess(acl, user);
   }
 
   @Override
@@ -649,10 +654,10 @@ public class LeafQueue implements CSQueu
 
     // Inform the parent queue
     try {
-      parent.submitApplication(application, userName, queue);
+      getParent().submitApplication(application, userName, queue);
     } catch (AccessControlException ace) {
       LOG.info("Failed to submit application to parent-queue: " + 
-          parent.getQueuePath(), ace);
+          getParent().getQueuePath(), ace);
       removeApplication(application, user);
       throw ace;
     }
@@ -708,7 +713,7 @@ public class LeafQueue implements CSQueu
     }
 
     // Inform the parent queue
-    parent.finishApplication(application, queue);
+    getParent().finishApplication(application, queue);
   }
 
   public synchronized void removeApplication(FiCaSchedulerApp application, User user) {
@@ -1351,7 +1356,7 @@ public class LeafQueue implements CSQueu
       }
 
       // Inform the parent queue
-      parent.completedContainer(clusterResource, application, 
+      getParent().completedContainer(clusterResource, application, 
           node, rmContainer, null, event);
     }
   }
@@ -1361,7 +1366,7 @@ public class LeafQueue implements CSQueu
     // Update queue metrics
     Resources.addTo(usedResources, resource);
     CSQueueUtils.updateQueueStatistics(
-        this, parent, clusterResource, minimumAllocation);
+        this, getParent(), clusterResource, minimumAllocation);
     ++numContainers;
 
     // Update user metrics
@@ -1386,7 +1391,7 @@ public class LeafQueue implements CSQueu
     // Update queue metrics
     Resources.subtractFrom(usedResources, resource);
     CSQueueUtils.updateQueueStatistics(
-        this, parent, clusterResource, minimumAllocation);
+        this, getParent(), clusterResource, minimumAllocation);
     --numContainers;
 
     // Update user metrics
@@ -1417,7 +1422,7 @@ public class LeafQueue implements CSQueu
     
     // Update metrics
     CSQueueUtils.updateQueueStatistics(
-        this, parent, clusterResource, minimumAllocation);
+        this, getParent(), clusterResource, minimumAllocation);
     
     // Update application properties
     for (FiCaSchedulerApp application : activeApplications) {
@@ -1488,7 +1493,7 @@ public class LeafQueue implements CSQueu
     synchronized (this) {
       allocateResource(clusterResource, application, container.getResource());
     }
-    parent.recoverContainer(clusterResource, application, container);
+    getParent().recoverContainer(clusterResource, application, container);
 
   }
   

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1401669&r1=1401668&r2=1401669&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
Wed Oct 24 13:25:19 2012
@@ -60,7 +60,7 @@ public class ParentQueue implements CSQu
 
   private static final Log LOG = LogFactory.getLog(ParentQueue.class);
 
-  private final CSQueue parent;
+  private CSQueue parent;
   private final String queueName;
   
   private float capacity;
@@ -216,11 +216,16 @@ public class ParentQueue implements CSQu
   }
   
   @Override
-  public CSQueue getParent() {
+  public synchronized CSQueue getParent() {
     return parent;
   }
 
   @Override
+  public synchronized void setParent(CSQueue newParentQueue) {
+    this.parent = (ParentQueue)newParentQueue;
+  }
+  
+  @Override
   public String getQueueName() {
     return queueName;
   }
@@ -357,37 +362,52 @@ public class ParentQueue implements CSQu
   }
   
   @Override
-  public synchronized void reinitialize(CSQueue queue, Resource clusterResource)
+  public synchronized void reinitialize(
+      CSQueue newlyParsedQueue, Resource clusterResource)
   throws IOException {
     // Sanity check
-    if (!(queue instanceof ParentQueue) ||
-        !queue.getQueuePath().equals(getQueuePath())) {
+    if (!(newlyParsedQueue instanceof ParentQueue) ||
+        !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
       throw new IOException("Trying to reinitialize " + getQueuePath() +
-          " from " + queue.getQueuePath());
+          " from " + newlyParsedQueue.getQueuePath());
     }
 
-    ParentQueue parentQueue = (ParentQueue)queue;
+    ParentQueue newlyParsedParentQueue = (ParentQueue)newlyParsedQueue;
 
     // Set new configs
     setupQueueConfigs(clusterResource,
-        parentQueue.capacity, parentQueue.absoluteCapacity,
-        parentQueue.maximumCapacity, parentQueue.absoluteMaxCapacity,
-        parentQueue.state, parentQueue.acls);
+        newlyParsedParentQueue.capacity, 
+        newlyParsedParentQueue.absoluteCapacity,
+        newlyParsedParentQueue.maximumCapacity, 
+        newlyParsedParentQueue.absoluteMaxCapacity,
+        newlyParsedParentQueue.state, 
+        newlyParsedParentQueue.acls);
 
     // Re-configure existing child queues and add new ones
     // The CS has already checked to ensure all existing child queues are present!
     Map<String, CSQueue> currentChildQueues = getQueues(childQueues);
-    Map<String, CSQueue> newChildQueues = getQueues(parentQueue.childQueues);
+    Map<String, CSQueue> newChildQueues = 
+        getQueues(newlyParsedParentQueue.childQueues);
     for (Map.Entry<String, CSQueue> e : newChildQueues.entrySet()) {
       String newChildQueueName = e.getKey();
       CSQueue newChildQueue = e.getValue();
 
       CSQueue childQueue = currentChildQueues.get(newChildQueueName);
-      if (childQueue != null){
+      
+      // Check if the child-queue already exists
+      if (childQueue != null) {
+        // Re-init existing child queues
         childQueue.reinitialize(newChildQueue, clusterResource);
         LOG.info(getQueueName() + ": re-configured queue: " + childQueue);
       } else {
+        // New child queue, do not re-init
+        
+        // Set parent to 'this'
+        newChildQueue.setParent(this);
+        
+        // Save in list of current child queues
         currentChildQueues.put(newChildQueueName, newChildQueue);
+        
         LOG.info(getQueueName() + ": added new child queue: " + newChildQueue);
       }
     }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1401669&r1=1401668&r2=1401669&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
Wed Oct 24 13:25:19 2012
@@ -378,4 +378,43 @@ public class TestCapacityScheduler {
 
     Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory());
   }
+
+  @Test
+  public void testRefreshQueuesWithNewQueue() throws Exception {
+    CapacityScheduler cs = new CapacityScheduler();
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    cs.setConf(new YarnConfiguration());
+    cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null,
+      null, new RMContainerTokenSecretManager(conf),
+      new ClientToAMTokenSecretManagerInRM()));
+    checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
+
+    // Add a new queue b4
+    String B4 = B + ".b4";
+    float B4_CAPACITY = 10;
+    
+    B3_CAPACITY -= B4_CAPACITY;
+    try {
+      conf.setCapacity(A, 80f);
+      conf.setCapacity(B, 20f);
+      conf.setQueues(B, new String[] {"b1", "b2", "b3", "b4"});
+      conf.setCapacity(B1, B1_CAPACITY);
+      conf.setCapacity(B2, B2_CAPACITY);
+      conf.setCapacity(B3, B3_CAPACITY);
+      conf.setCapacity(B4, B4_CAPACITY);
+      cs.reinitialize(conf,null);
+      checkQueueCapacities(cs, 80f, 20f);
+      
+      // Verify parent for B4
+      CSQueue rootQueue = cs.getRootQueue();
+      CSQueue queueB = findQueue(rootQueue, B);
+      CSQueue queueB4 = findQueue(queueB, B4);
+
+      assertEquals(queueB, queueB4.getParent());
+    } finally {
+      B3_CAPACITY += B4_CAPACITY;
+    }
+  }
+
 }



Mime
View raw message