hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r1415592 [1/2] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ hadoop-yarn/hadoop-yarn-server/ha...
Date Fri, 30 Nov 2012 12:03:28 GMT
Author: tomwhite
Date: Fri Nov 30 12:03:25 2012
New Revision: 1415592

URL: http://svn.apache.org/viewvc?rev=1415592&view=rev
Log:
YARN-187. Add hierarchical queues to the fair scheduler. Contributed by Sandy Ryza.

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java   (with props)
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java   (with props)
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java   (with props)
Removed:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueSchedulable.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSQueueSchedulable.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1415592&r1=1415591&r2=1415592&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Fri Nov 30 12:03:25 2012
@@ -117,6 +117,8 @@ Release 2.0.3-alpha - Unreleased 
 
     YARN-229. Remove old unused RM recovery code. (Bikas Saha via acmurthy) 
 
+    YARN-187. Add hierarchical queues to the fair scheduler. (Sandy Ryza via tomwhite)
+
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1415592&r1=1415591&r2=1415592&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Fri Nov 30 12:03:25 2012
@@ -50,10 +50,10 @@ public class AppSchedulable extends Sche
   private long startTime;
   private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private static final Log LOG = LogFactory.getLog(AppSchedulable.class);
-  private FSQueue queue;
+  private FSLeafQueue queue;
   private RMContainerTokenSecretManager containerTokenSecretManager;
 
-  public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSQueue queue) {
+  public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) {
     this.scheduler = scheduler;
     this.app = app;
     this.startTime = System.currentTimeMillis();
@@ -97,9 +97,6 @@ public class AppSchedulable extends Sche
   }
 
   @Override
-  public void redistributeShare() {}
-
-  @Override
   public Resource getResourceUsage() {
     return app.getCurrentConsumption();
   }
@@ -114,7 +111,7 @@ public class AppSchedulable extends Sche
    * Get metrics reference from containing queue.
    */
   public QueueMetrics getMetrics() {
-    return queue.getQueueSchedulable().getMetrics();
+    return queue.getMetrics();
   }
 
   @Override

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1415592&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Fri Nov 30 12:03:25 2012
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+
+public class FSLeafQueue extends FSQueue {
+  private static final Log LOG = LogFactory.getLog(
+      FSLeafQueue.class.getName());
+    
+  private final List<AppSchedulable> appScheds = 
+      new ArrayList<AppSchedulable>();
+
+  /** Scheduling mode for jobs inside the queue (fair or FIFO) */
+  private SchedulingMode schedulingMode;
+  
+  private final FairScheduler scheduler;
+  private final QueueManager queueMgr;
+  private Resource demand = Resources.createResource(0);
+  
+  // Variables used for preemption
+  private long lastTimeAtMinShare;
+  private long lastTimeAtHalfFairShare;
+  
+  public FSLeafQueue(String name, QueueManager queueMgr, FairScheduler scheduler,
+      FSParentQueue parent) {
+    super(name, queueMgr, scheduler, parent);
+    this.scheduler = scheduler;
+    this.queueMgr = queueMgr;
+    this.lastTimeAtMinShare = scheduler.getClock().getTime();
+    this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
+  }
+  
+  public void addApp(FSSchedulerApp app) {
+    AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this);
+    app.setAppSchedulable(appSchedulable);
+    appScheds.add(appSchedulable);
+  }
+  
+  // for testing
+  void addAppSchedulable(AppSchedulable appSched) {
+    appScheds.add(appSched);
+  }
+  
+  public void removeApp(FSSchedulerApp app) {
+    for (Iterator<AppSchedulable> it = appScheds.iterator(); it.hasNext();) {
+      AppSchedulable appSched = it.next();
+      if (appSched.getApp() == app) {
+        it.remove();
+        break;
+      }
+    }
+  }
+  
+  public Collection<AppSchedulable> getAppSchedulables() {
+    return appScheds;
+  }
+
+  public void setSchedulingMode(SchedulingMode mode) {
+    this.schedulingMode = mode;
+  }
+  
+  @Override
+  public void recomputeFairShares() {
+    if (schedulingMode == SchedulingMode.FAIR) {
+      SchedulingAlgorithms.computeFairShares(appScheds, getFairShare());
+    } else {
+      for (AppSchedulable sched: appScheds) {
+        sched.setFairShare(Resources.createResource(0));
+      }
+    }
+  }
+
+  @Override
+  public Resource getDemand() {
+    return demand;
+  }
+
+  @Override
+  public Resource getResourceUsage() {
+    Resource usage = Resources.createResource(0);
+    for (AppSchedulable app : appScheds) {
+      Resources.addTo(usage, app.getResourceUsage());
+    }
+    return usage;
+  }
+
+  @Override
+  public void updateDemand() {
+    // Compute demand by iterating through apps in the queue
+    // Limit demand to maxResources
+    Resource maxRes = queueMgr.getMaxResources(getName());
+    demand = Resources.createResource(0);
+    for (AppSchedulable sched : appScheds) {
+      sched.updateDemand();
+      Resource toAdd = sched.getDemand();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Counting resource from " + sched.getName() + " " + toAdd
+            + "; Total resource consumption for " + getName() + " now "
+            + demand);
+      }
+      demand = Resources.add(demand, toAdd);
+      if (Resources.greaterThanOrEqual(demand, maxRes)) {
+        demand = maxRes;
+        break;
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("The updated demand for " + getName() + " is " + demand
+          + "; the max is " + maxRes);
+    }
+  }
+
+  @Override
+  public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
+    LOG.debug("Node offered to queue: " + getName() + " reserved: " + reserved);
+    // If this queue is over its limit, reject
+    if (Resources.greaterThan(getResourceUsage(),
+        queueMgr.getMaxResources(getName()))) {
+      return Resources.none();
+    }
+
+    // If this node already has reserved resources for an app, first try to
+    // finish allocating resources for that app.
+    if (reserved) {
+      for (AppSchedulable sched : appScheds) {
+        if (sched.getApp().getApplicationAttemptId() ==
+            node.getReservedContainer().getApplicationAttemptId()) {
+          return sched.assignContainer(node, reserved);
+        }
+      }
+      return Resources.none(); // We should never get here
+    }
+
+    // Otherwise, chose app to schedule based on given policy (fair vs fifo).
+    else {
+      Comparator<Schedulable> comparator;
+      if (schedulingMode == SchedulingMode.FIFO) {
+        comparator = new SchedulingAlgorithms.FifoComparator();
+      } else if (schedulingMode == SchedulingMode.FAIR) {
+        comparator = new SchedulingAlgorithms.FairShareComparator();
+      } else {
+        throw new RuntimeException("Unsupported queue scheduling mode " + 
+            schedulingMode);
+      }
+
+      Collections.sort(appScheds, comparator);
+      for (AppSchedulable sched: appScheds) {
+        return sched.assignContainer(node, reserved);
+      }
+
+      return Resources.none();
+    }
+  }
+
+  @Override
+  public Collection<FSQueue> getChildQueues() {
+    return new ArrayList<FSQueue>(1);
+  }
+  
+  @Override
+  public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
+    QueueUserACLInfo userAclInfo =
+      recordFactory.newRecordInstance(QueueUserACLInfo.class);
+    List<QueueACL> operations = new ArrayList<QueueACL>();
+    for (QueueACL operation : QueueACL.values()) {
+      Map<QueueACL, AccessControlList> acls = queueMgr.getQueueAcls(getName());
+      if (acls.get(operation).isUserAllowed(user)) {
+        operations.add(operation);
+      }
+    }
+
+    userAclInfo.setQueueName(getQueueName());
+    userAclInfo.setUserAcls(operations);
+    return Collections.singletonList(userAclInfo);
+  }
+  
+  public long getLastTimeAtMinShare() {
+    return lastTimeAtMinShare;
+  }
+
+  public void setLastTimeAtMinShare(long lastTimeAtMinShare) {
+    this.lastTimeAtMinShare = lastTimeAtMinShare;
+  }
+
+  public long getLastTimeAtHalfFairShare() {
+    return lastTimeAtHalfFairShare;
+  }
+
+  public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
+    this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
+  }
+}

Propchange: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1415592&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Fri Nov 30 12:03:25 2012
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+
+public class FSParentQueue extends FSQueue {
+  private static final Log LOG = LogFactory.getLog(
+      FSParentQueue.class.getName());
+
+
+  private final List<FSQueue> childQueues = 
+      new ArrayList<FSQueue>();
+  private final QueueManager queueMgr;
+  private Resource demand = Resources.createResource(0);
+  
+  public FSParentQueue(String name, QueueManager queueMgr, FairScheduler scheduler,
+      FSParentQueue parent) {
+    super(name, queueMgr, scheduler, parent);
+    this.queueMgr = queueMgr;
+  }
+  
+  public void addChildQueue(FSQueue child) {
+    childQueues.add(child);
+  }
+
+  @Override
+  public void recomputeFairShares() {
+    SchedulingAlgorithms.computeFairShares(childQueues, getFairShare());
+    for (FSQueue childQueue : childQueues) {
+      childQueue.getMetrics().setAvailableResourcesToQueue(childQueue.getFairShare());
+      childQueue.recomputeFairShares();
+    }
+  }
+
+  @Override
+  public Resource getDemand() {
+    return demand;
+  }
+
+  @Override
+  public Resource getResourceUsage() {
+    Resource usage = Resources.createResource(0);
+    for (FSQueue child : childQueues) {
+      Resources.addTo(usage, child.getResourceUsage());
+    }
+    return usage;
+  }
+
+  @Override
+  public void updateDemand() {
+    // Compute demand by iterating through apps in the queue
+    // Limit demand to maxResources
+    Resource maxRes = queueMgr.getMaxResources(getName());
+    demand = Resources.createResource(0);
+    for (FSQueue childQueue : childQueues) {
+      childQueue.updateDemand();
+      Resource toAdd = childQueue.getDemand();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Counting resource from " + childQueue.getName() + " " + 
+            toAdd + "; Total resource consumption for " + getName() +
+            " now " + demand);
+      }
+      demand = Resources.add(demand, toAdd);
+      if (Resources.greaterThanOrEqual(demand, maxRes)) {
+        demand = maxRes;
+        break;
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("The updated demand for " + getName() + " is " + demand +
+          "; the max is " + maxRes);
+    }    
+  }
+  
+  public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
+    synchronized (this) {
+      if (getQueueAcls().get(acl).isUserAllowed(user)) {
+        return true;
+      }
+    }
+    
+    if (parent != null) {
+      return parent.hasAccess(acl, user);
+    }
+    
+    return false;
+  }
+  
+  private synchronized QueueUserACLInfo getUserAclInfo(
+      UserGroupInformation user) {
+    QueueUserACLInfo userAclInfo = 
+      recordFactory.newRecordInstance(QueueUserACLInfo.class);
+    List<QueueACL> operations = new ArrayList<QueueACL>();
+    for (QueueACL operation : QueueACL.values()) {
+      if (hasAccess(operation, user)) {
+        operations.add(operation);
+      } 
+    }
+
+    userAclInfo.setQueueName(getQueueName());
+    userAclInfo.setUserAcls(operations);
+    return userAclInfo;
+  }
+  
+  @Override
+  public synchronized List<QueueUserACLInfo> getQueueUserAclInfo(
+      UserGroupInformation user) {
+    List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
+    
+    // Add queue acls
+    userAcls.add(getUserAclInfo(user));
+    
+    // Add children queue acls
+    for (FSQueue child : childQueues) {
+      userAcls.addAll(child.getQueueUserAclInfo(user));
+    }
+ 
+    return userAcls;
+  }
+
+  @Override
+  public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
+    throw new IllegalStateException(
+        "Parent queue should not be assigned container");
+  }
+
+  @Override
+  public Collection<FSQueue> getChildQueues() {
+    return childQueues;
+  }
+}

Propchange: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1415592&r1=1415591&r2=1415592&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Fri Nov 30 12:03:25 2012
@@ -20,65 +20,112 @@ package org.apache.hadoop.yarn.server.re
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-
-/**
- * A queue containing several applications.
- */
-@Private
-@Unstable
-public class FSQueue {
-  /** Queue name. */
-  private String name;
-
-  /** Applications in this specific queue; does not include children queues' jobs. */
-  private Collection<FSSchedulerApp> applications = 
-      new ArrayList<FSSchedulerApp>();
-
-  /** Scheduling mode for jobs inside the queue (fair or FIFO) */
-  private SchedulingMode schedulingMode;
-
-  private FairScheduler scheduler;
-
-  private FSQueueSchedulable queueSchedulable;
-
-  public FSQueue(FairScheduler scheduler, String name) {
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+
+public abstract class FSQueue extends Schedulable implements Queue {
+  private final String name;
+  private final QueueManager queueMgr;
+  private final FairScheduler scheduler;
+  private final QueueMetrics metrics;
+  
+  protected final FSParentQueue parent;
+  protected final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+  
+  public FSQueue(String name, QueueManager queueMgr, 
+      FairScheduler scheduler, FSParentQueue parent) {
     this.name = name;
-    this.queueSchedulable = new FSQueueSchedulable(scheduler, this);
+    this.queueMgr = queueMgr;
     this.scheduler = scheduler;
+    this.metrics = QueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
+    this.parent = parent;
   }
-
-  public Collection<FSSchedulerApp> getApplications() {
-    return applications;
-  }
-
-  public void addApp(FSSchedulerApp app) {
-    applications.add(app);
-    AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this);
-    app.setAppSchedulable(appSchedulable);
-    queueSchedulable.addApp(appSchedulable);
-  }
-
-  public void removeApp(FSSchedulerApp app) {
-    applications.remove(app);
-    queueSchedulable.removeApp(app);
-  }
-
+  
   public String getName() {
     return name;
   }
-
-  public SchedulingMode getSchedulingMode() {
-    return schedulingMode;
-  }
-
-  public void setSchedulingMode(SchedulingMode schedulingMode) {
-    this.schedulingMode = schedulingMode;
-  }
-
-  public FSQueueSchedulable getQueueSchedulable() {
-    return queueSchedulable;
+  
+  @Override
+  public String getQueueName() {
+    return name;
   }
+  
+  @Override
+  public double getWeight() {
+    return queueMgr.getQueueWeight(getName());
+  }
+  
+  @Override
+  public Resource getMinShare() {
+    return queueMgr.getMinResources(getName());
+  }
+
+  @Override
+  public long getStartTime() {
+    return 0;
+  }
+
+  @Override
+  public Priority getPriority() {
+    Priority p = recordFactory.newRecordInstance(Priority.class);
+    p.setPriority(1);
+    return p;
+  }
+  
+  @Override
+  public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) {
+    QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
+    queueInfo.setQueueName(getQueueName());
+    // TODO: we might change these queue metrics around a little bit
+    // to match the semantics of the fair scheduler.
+    queueInfo.setCapacity((float) getFairShare().getMemory() /
+        scheduler.getClusterCapacity().getMemory());
+    queueInfo.setCapacity((float) getResourceUsage().getMemory() /
+        scheduler.getClusterCapacity().getMemory());
+    
+    ArrayList<QueueInfo> childQueueInfos = new ArrayList<QueueInfo>();
+    if (includeChildQueues) {
+      Collection<FSQueue> childQueues = getChildQueues();
+      for (FSQueue child : childQueues) {
+        childQueueInfos.add(child.getQueueInfo(recursive, recursive));
+      }
+    }
+    queueInfo.setChildQueues(childQueueInfos);
+    queueInfo.setQueueState(QueueState.RUNNING);
+    return queueInfo;
+  }
+  
+  @Override
+  public Map<QueueACL, AccessControlList> getQueueAcls() {
+    Map<QueueACL, AccessControlList> acls = queueMgr.getQueueAcls(getName());
+    return new HashMap<QueueACL, AccessControlList>(acls);
+  }
+  
+  @Override
+  public QueueMetrics getMetrics() {
+    return metrics;
+  }
+  
+  /**
+   * Recomputes the fair shares for all queues and applications
+   * under this queue.
+   */
+  public abstract void recomputeFairShares();
+  
+  /**
+   * Gets the children of this queue, if any.
+   */
+  public abstract Collection<FSQueue> getChildQueues();
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1415592&r1=1415591&r2=1415592&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Nov 30 12:03:25 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -75,6 +76,25 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 
+/**
+ * A scheduler that schedules resources between a set of queues. The scheduler
+ * keeps track of the resources used by each queue, and attempts to maintain
+ * fairness by scheduling tasks at queues whose allocations are farthest below
+ * an ideal fair distribution.
+ * 
+ * The fair scheduler supports hierarchical queues. All queues descend from a
+ * queue named "root". Available resources are distributed among the children
+ * of the root queue in the typical fair scheduling fashion. Then, the children
+ * distribute the resources assigned to them to their children in the same
+ * fashion.  Applications may only be scheduled on leaf queues. Queues can be
+ * specified as children of other queues by placing them as sub-elements of their
+ * parents in the fair scheduler configuration file.
+ * 
+ * A queue's name starts with the names of its parents, with periods as
+ * separators.  So a queue named "queue1" under the root named, would be 
+ * referred to as "root.queue1", and a queue named "queue2" under a queue
+ * named "parent1" would be referred to as "root.parent1.queue2".
+ */
 @LimitedPrivate("yarn")
 @Unstable
 @SuppressWarnings("unchecked")
@@ -105,23 +125,22 @@ public class FairScheduler implements Re
   // Aggregate metrics
   QueueMetrics rootMetrics;
 
-  //Time when we last updated preemption vars
+  // Time when we last updated preemption vars
   protected long lastPreemptionUpdateTime;
-  //Time we last ran preemptTasksIfNecessary
+  // Time we last ran preemptTasksIfNecessary
   private long lastPreemptCheckTime;
 
-
   // This stores per-application scheduling information, indexed by
   // attempt ID's for fast lookup.
-  protected Map<ApplicationAttemptId, FSSchedulerApp> applications
-  = new HashMap<ApplicationAttemptId, FSSchedulerApp>();
+  protected Map<ApplicationAttemptId, FSSchedulerApp> applications = 
+      new HashMap<ApplicationAttemptId, FSSchedulerApp>();
 
   // Nodes in the cluster, indexed by NodeId
-  private Map<NodeId, FSSchedulerNode> nodes =
+  private Map<NodeId, FSSchedulerNode> nodes = 
       new ConcurrentHashMap<NodeId, FSSchedulerNode>();
 
   // Aggregate capacity of the cluster
-  private Resource clusterCapacity =
+  private Resource clusterCapacity = 
       RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
 
   // How often tasks are preempted (must be longer than a couple
@@ -131,10 +150,11 @@ public class FairScheduler implements Re
   protected boolean preemptionEnabled;
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
   protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
-  protected double nodeLocalityThreshold;   // Cluster threshold for node locality
-  protected double rackLocalityThreshold;   // Cluster threshold for rack locality
-  private FairSchedulerEventLog eventLog;   // Machine-readable event log
-  protected boolean assignMultiple; // Allocate multiple containers per heartbeat
+  protected double nodeLocalityThreshold; // Cluster threshold for node locality
+  protected double rackLocalityThreshold; // Cluster threshold for rack locality
+  private FairSchedulerEventLog eventLog; // Machine-readable event log
+  protected boolean assignMultiple; // Allocate multiple containers per
+                                    // heartbeat
   protected int maxAssign; // Max containers to assign per heartbeat
   
   public FairScheduler() {
@@ -150,16 +170,8 @@ public class FairScheduler implements Re
     return queueMgr;
   }
 
-  public List<FSQueueSchedulable> getQueueSchedulables() {
-    List<FSQueueSchedulable> scheds = new ArrayList<FSQueueSchedulable>();
-    for (FSQueue queue: queueMgr.getQueues()) {
-      scheds.add(queue.getQueueSchedulable());
-    }
-    return scheds;
-  }
-
   private RMContainer getRMContainer(ContainerId containerId) {
-    FSSchedulerApp application =
+    FSSchedulerApp application = 
         applications.get(containerId.getApplicationAttemptId());
     return (application == null) ? null : application.getRMContainer(containerId);
   }
@@ -183,34 +195,24 @@ public class FairScheduler implements Re
   }
 
   /**
-  * Recompute the internal variables used by the scheduler - per-job weights,
-  * fair shares, deficits, minimum slot allocations, and amount of used and
-  * required resources per job.
-  */
+   * Recompute the internal variables used by the scheduler - per-job weights,
+   * fair shares, deficits, minimum slot allocations, and amount of used and
+   * required resources per job.
+   */
   protected synchronized void update() {
     queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
     updateRunnability(); // Set job runnability based on user/queue limits
     updatePreemptionVariables(); // Determine if any queues merit preemption
 
-    // Update demands of apps and queues
-    for (FSQueue queue: queueMgr.getQueues()) {
-      queue.getQueueSchedulable().updateDemand();
-    }
+    FSQueue rootQueue = queueMgr.getRootQueue();
 
-    // Compute fair shares based on updated demands
-    List<FSQueueSchedulable> queueScheds = getQueueSchedulables();
-    SchedulingAlgorithms.computeFairShares(
-        queueScheds, clusterCapacity);
+    // Recursively update demands for all queues
+    rootQueue.updateDemand();
 
-    // Update queue metrics for this queue
-    for (FSQueueSchedulable sched : queueScheds) {
-      sched.getMetrics().setAvailableResourcesToQueue(sched.getFairShare());
-    }
-
-    // Use the computed shares to assign shares within each queue
-    for (FSQueue queue: queueMgr.getQueues()) {
-      queue.getQueueSchedulable().redistributeShare();
-    }
+    rootQueue.setFairShare(clusterCapacity);
+    // Recursively compute fair shares for all queues
+    // and update metrics
+    rootQueue.recomputeFairShares();
 
     // Update recorded capacity of root queue (child queues are updated
     // when fair share is calculated).
@@ -225,7 +227,7 @@ public class FairScheduler implements Re
   private void updatePreemptionVariables() {
     long now = clock.getTime();
     lastPreemptionUpdateTime = now;
-    for (FSQueueSchedulable sched: getQueueSchedulables()) {
+    for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
       if (!isStarvedForMinShare(sched)) {
         sched.setLastTimeAtMinShare(now);
       }
@@ -238,16 +240,16 @@ public class FairScheduler implements Re
   /**
    * Is a queue below its min share for the given task type?
    */
-  boolean isStarvedForMinShare(FSQueueSchedulable sched) {
+  boolean isStarvedForMinShare(FSLeafQueue sched) {
     Resource desiredShare = Resources.min(sched.getMinShare(), sched.getDemand());
     return Resources.lessThan(sched.getResourceUsage(), desiredShare);
   }
 
   /**
-   * Is a queue being starved for fair share for the given task type?
-   * This is defined as being below half its fair share.
+   * Is a queue being starved for fair share for the given task type? This is
+   * defined as being below half its fair share.
    */
-  boolean isStarvedForFairShare(FSQueueSchedulable sched) {
+  boolean isStarvedForFairShare(FSLeafQueue sched) {
     Resource desiredFairShare = Resources.max(
         Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
     return Resources.lessThan(sched.getResourceUsage(), desiredFairShare);
@@ -255,10 +257,10 @@ public class FairScheduler implements Re
 
   /**
    * Check for queues that need tasks preempted, either because they have been
-   * below their guaranteed share for minSharePreemptionTimeout or they
-   * have been below half their fair share for the fairSharePreemptionTimeout.
-   * If such queues exist, compute how many tasks of each type need to be
-   * preempted and then select the right ones using preemptTasks.
+   * below their guaranteed share for minSharePreemptionTimeout or they have
+   * been below half their fair share for the fairSharePreemptionTimeout. If
+   * such queues exist, compute how many tasks of each type need to be preempted
+   * and then select the right ones using preemptTasks.
    */
   protected synchronized void preemptTasksIfNecessary() {
     if (!preemptionEnabled) {
@@ -273,35 +275,37 @@ public class FairScheduler implements Re
 
     Resource resToPreempt = Resources.none();
 
-    for (FSQueueSchedulable sched: getQueueSchedulables()) {
+    for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
       resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
     }
     if (Resources.greaterThan(resToPreempt, Resources.none())) {
-      preemptResources(getQueueSchedulables(), resToPreempt);
+      preemptResources(queueMgr.getLeafQueues(), resToPreempt);
     }
   }
 
   /**
-   * Preempt a quantity of resources from a list of QueueSchedulables.
-   * The policy for this is to pick apps from queues that are over their fair
-   * share, but make sure that no queue is placed below its fair share in the
-   * process. We further prioritize preemption by choosing containers with
-   * lowest priority to preempt.
+   * Preempt a quantity of resources from a list of QueueSchedulables. The
+   * policy for this is to pick apps from queues that are over their fair share,
+   * but make sure that no queue is placed below its fair share in the process.
+   * We further prioritize preemption by choosing containers with lowest
+   * priority to preempt.
    */
-  protected void preemptResources(List<FSQueueSchedulable> scheds, Resource toPreempt) {
+  protected void preemptResources(Collection<FSLeafQueue> scheds,
+      Resource toPreempt) {
     if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) {
       return;
     }
 
     Map<RMContainer, FSSchedulerApp> apps = 
         new HashMap<RMContainer, FSSchedulerApp>();
-    Map<RMContainer, FSQueueSchedulable> queues = new HashMap<RMContainer, FSQueueSchedulable>();
+    Map<RMContainer, FSLeafQueue> queues = 
+        new HashMap<RMContainer, FSLeafQueue>();
 
     // Collect running containers from over-scheduled queues
     List<RMContainer> runningContainers = new ArrayList<RMContainer>();
-    for (FSQueueSchedulable sched: scheds) {
+    for (FSLeafQueue sched : scheds) {
       if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
-        for (AppSchedulable as: sched.getAppSchedulables()) {
+        for (AppSchedulable as : sched.getAppSchedulables()) {
           for (RMContainer c : as.getApp().getLiveContainers()) {
             runningContainers.add(c);
             apps.put(c, as.getApp());
@@ -321,12 +325,12 @@ public class FairScheduler implements Re
 
     // Scan down the sorted list of task statuses until we've killed enough
     // tasks, making sure we don't kill too many from any queue
-    for (RMContainer container: runningContainers) {
-     FSQueueSchedulable sched = queues.get(container);
+    for (RMContainer container : runningContainers) {
+      FSLeafQueue sched = queues.get(container);
       if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
         LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
             "res=" + container.getContainer().getResource() +
-            ") from queue " + sched.getQueue().getName());
+            ") from queue " + sched.getName());
         ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus(
             container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
 
@@ -348,12 +352,12 @@ public class FairScheduler implements Re
    * If the queue has been below its min share for at least its preemption
    * timeout, it should preempt the difference between its current share and
    * this min share. If it has been below half its fair share for at least the
-   * fairSharePreemptionTimeout, it should preempt enough tasks to get up to
-   * its full fair share. If both conditions hold, we preempt the max of the
-   * two amounts (this shouldn't happen unless someone sets the timeouts to
-   * be identical for some reason).
+   * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its
+   * full fair share. If both conditions hold, we preempt the max of the two
+   * amounts (this shouldn't happen unless someone sets the timeouts to be
+   * identical for some reason).
    */
-  protected Resource resToPreempt(FSQueueSchedulable sched, long curTime) {
+  protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
     String queue = sched.getName();
     long minShareTimeout = queueMgr.getMinSharePreemptionTimeout(queue);
     long fairShareTimeout = queueMgr.getFairSharePreemptionTimeout();
@@ -362,7 +366,7 @@ public class FairScheduler implements Re
     if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
       Resource target = Resources.min(sched.getMinShare(), sched.getDemand());
       resDueToMinShare = Resources.max(Resources.none(),
-                            Resources.subtract(target, sched.getResourceUsage()));
+          Resources.subtract(target, sched.getResourceUsage()));
     }
     if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
       Resource target = Resources.min(sched.getFairShare(), sched.getDemand());
@@ -380,15 +384,15 @@ public class FairScheduler implements Re
   }
 
   /**
-   * This updates the runnability of all apps based on whether or not
-   * any users/queues have exceeded their capacity.
+   * This updates the runnability of all apps based on whether or not any
+   * users/queues have exceeded their capacity.
    */
   private void updateRunnability() {
     List<AppSchedulable> apps = new ArrayList<AppSchedulable>();
 
     // Start by marking everything as not runnable
-    for (FSQueue p: queueMgr.getQueues()) {
-      for (AppSchedulable a: p.getQueueSchedulable().getAppSchedulables()) {
+    for (FSLeafQueue leafQueue : queueMgr.getLeafQueues()) {
+      for (AppSchedulable a : leafQueue.getAppSchedulables()) {
         a.setRunnable(false);
         apps.add(a);
       }
@@ -400,7 +404,7 @@ public class FairScheduler implements Re
     Map<String, Integer> userApps = new HashMap<String, Integer>();
     Map<String, Integer> queueApps = new HashMap<String, Integer>();
 
-    for (AppSchedulable app: apps) {
+    for (AppSchedulable app : apps) {
       String user = app.getApp().getUser();
       String queue = app.getApp().getQueueName();
       int userCount = userApps.containsKey(user) ? userApps.get(user) : 0;
@@ -473,22 +477,25 @@ public class FairScheduler implements Re
   }
 
   /**
-   * Add a new application to the scheduler, with a given id, queue name,
-   * and user. This will accept a new app even if the user or queue is above
+   * Add a new application to the scheduler, with a given id, queue name, and
+   * user. This will accept a new app even if the user or queue is above
    * configured limits, but the app will not be marked as runnable.
    */
-  protected synchronized void
-  addApplication(ApplicationAttemptId applicationAttemptId,
-      String queueName, String user) {
+  protected synchronized void addApplication(
+      ApplicationAttemptId applicationAttemptId, String queueName, String user) {
 
-    FSQueue queue = queueMgr.getQueue(queueName);
+    FSLeafQueue queue = queueMgr.getLeafQueue(queueName);
+    if (queue == null) {
+      // queue is not an existing or createable leaf queue
+      queue = queueMgr.getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+    }
 
     FSSchedulerApp schedulerApp =
         new FSSchedulerApp(applicationAttemptId, user,
-            queue.getQueueSchedulable(), new ActiveUsersManager(getRootQueueMetrics()),
+            queue, new ActiveUsersManager(getRootQueueMetrics()),
             rmContext);
-
-    // Inforce ACLs
+    
+    // Enforce ACLs
     UserGroupInformation userUgi;
     try {
       userUgi = UserGroupInformation.getCurrentUser();
@@ -497,8 +504,8 @@ public class FairScheduler implements Re
       return;
     }
 
-    List<QueueUserACLInfo> info = queue.getQueueSchedulable().getQueueUserAclInfo(
-        userUgi); // Always a signleton list
+    // Always a singleton list
+    List<QueueUserACLInfo> info = queue.getQueueUserAclInfo(userUgi);
     if (!info.get(0).getUserAcls().contains(QueueACL.SUBMIT_APPLICATIONS)) {
       LOG.info("User " + userUgi.getUserName() +
           " cannot submit" + " applications to queue " + queue.getName());
@@ -506,14 +513,13 @@ public class FairScheduler implements Re
     }
 
     queue.addApp(schedulerApp);
-    queue.getQueueSchedulable().getMetrics().submitApp(user,
-    		applicationAttemptId.getAttemptId());
+    queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
     rootMetrics.submitApp(user, applicationAttemptId.getAttemptId());
 
     applications.put(applicationAttemptId, schedulerApp);
 
     LOG.info("Application Submission: " + applicationAttemptId +
-        ", user: " + user +
+        ", user: "+ user +
         ", currently active: " + applications.size());
 
     rmContext.getDispatcher().getEventHandler().handle(
@@ -540,10 +546,10 @@ public class FairScheduler implements Re
           SchedulerUtils.createAbnormalContainerStatus(
               rmContainer.getContainerId(),
               SchedulerUtils.COMPLETED_APPLICATION),
-          RMContainerEventType.KILL);
+              RMContainerEventType.KILL);
     }
 
-     // Release all reserved containers
+    // Release all reserved containers
     for (RMContainer rmContainer : application.getReservedContainers()) {
       completedContainer(rmContainer,
           SchedulerUtils.createAbnormalContainerStatus(
@@ -556,7 +562,8 @@ public class FairScheduler implements Re
     application.stop(rmAppAttemptFinalState);
 
     // Inform the queue
-    FSQueue queue = queueMgr.getQueue(application.getQueue().getQueueName());
+    FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue()
+        .getQueueName());
     queue.removeApp(application);
 
     // Remove from our data-structure
@@ -658,11 +665,11 @@ public class FairScheduler implements Re
     for (ContainerId releasedContainerId : release) {
       RMContainer rmContainer = getRMContainer(releasedContainerId);
       if (rmContainer == null) {
-         RMAuditLogger.logFailure(application.getUser(),
-             AuditConstants.RELEASE_CONTAINER,
-             "Unauthorized access or invalid container", "FairScheduler",
-             "Trying to release container not owned by app or with invalid id",
-             application.getApplicationId(), releasedContainerId);
+        RMAuditLogger.logFailure(application.getUser(),
+            AuditConstants.RELEASE_CONTAINER,
+            "Unauthorized access or invalid container", "FairScheduler",
+            "Trying to release container not owned by app or with invalid id",
+            application.getApplicationId(), releasedContainerId);
       }
       completedContainer(rmContainer,
           SchedulerUtils.createAbnormalContainerStatus(
@@ -675,8 +682,8 @@ public class FairScheduler implements Re
       if (!ask.isEmpty()) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("allocate: pre-update" +
-            " applicationAttemptId=" + appAttemptId +
-            " application=" + application.getApplicationId());
+              " applicationAttemptId=" + appAttemptId +
+              " application=" + application.getApplicationId());
         }
         application.showRequests();
 
@@ -689,19 +696,17 @@ public class FairScheduler implements Re
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("allocate:" +
-          " applicationAttemptId=" + appAttemptId +
-          " #ask=" + ask.size());
+            " applicationAttemptId=" + appAttemptId +
+            " #ask=" + ask.size());
       }
 
-      return new Allocation(
-          application.pullNewlyAllocatedContainers(),
+      return new Allocation(application.pullNewlyAllocatedContainers(),
           application.getHeadroom());
     }
   }
 
   /**
-   * Process a container which has launched on a node, as reported by the
-   * node.
+   * Process a container which has launched on a node, as reported by the node.
    */
   private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
     // Get the application for the finished container
@@ -757,20 +762,20 @@ public class FairScheduler implements Re
       LOG.info("Trying to fulfill reservation for application " +
           reservedApplication.getApplicationId() + " on node: " + nm);
 
-      FSQueue queue = queueMgr.getQueue(reservedApplication.getQueueName());
-      queue.getQueueSchedulable().assignContainer(node, true);
+      FSLeafQueue queue = queueMgr.getLeafQueue(reservedApplication.getQueueName());
+      queue.assignContainer(node, true);
     }
 
-
     // Otherwise, schedule at queue which is furthest below fair share
     else {
       int assignedContainers = 0;
       while (true) {
         // At most one task is scheduled each iteration of this loop
-        List<FSQueueSchedulable> scheds = getQueueSchedulables();
+        List<FSLeafQueue> scheds = new ArrayList<FSLeafQueue>(
+            queueMgr.getLeafQueues());
         Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
         boolean assignedContainer = false;
-        for (FSQueueSchedulable sched : scheds) {
+        for (FSLeafQueue sched : scheds) {
           Resource assigned = sched.assignContainer(node, false);
           if (Resources.greaterThan(assigned, Resources.none())) {
             eventLog.log("ASSIGN", nm.getHostName(), assigned);
@@ -813,7 +818,7 @@ public class FairScheduler implements Re
 
   @Override
   public void handle(SchedulerEvent event) {
-    switch(event.getType()) {
+    switch (event.getType()) {
     case NODE_ADDED:
       if (!(event instanceof NodeAddedSchedulerEvent)) {
         throw new RuntimeException("Unexpected event type: " + event);
@@ -832,8 +837,7 @@ public class FairScheduler implements Re
       if (!(event instanceof NodeUpdateSchedulerEvent)) {
         throw new RuntimeException("Unexpected event type: " + event);
       }
-      NodeUpdateSchedulerEvent nodeUpdatedEvent =
-      (NodeUpdateSchedulerEvent)event;
+      NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
       nodeUpdate(nodeUpdatedEvent.getRMNode(),
           nodeUpdatedEvent.getNewlyLaunchedContainers(),
           nodeUpdatedEvent.getCompletedContainers());
@@ -842,7 +846,7 @@ public class FairScheduler implements Re
       if (!(event instanceof AppAddedSchedulerEvent)) {
         throw new RuntimeException("Unexpected event type: " + event);
       }
-      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
+      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
       String queue = appAddedEvent.getQueue();
 
       // Potentially set queue to username if configured to do so
@@ -867,7 +871,7 @@ public class FairScheduler implements Re
         throw new RuntimeException("Unexpected event type: " + event);
       }
       ContainerExpiredSchedulerEvent containerExpiredEvent =
-          (ContainerExpiredSchedulerEvent) event;
+          (ContainerExpiredSchedulerEvent)event;
       ContainerId containerId = containerExpiredEvent.getContainerId();
       completedContainer(getRMContainer(containerId),
           SchedulerUtils.createAbnormalContainerStatus(
@@ -886,8 +890,8 @@ public class FairScheduler implements Re
   }
 
   @Override
-  public synchronized void
-      reinitialize(Configuration conf, RMContext rmContext) throws IOException {
+  public synchronized void reinitialize(Configuration conf, RMContext rmContext)
+      throws IOException {
     if (!initialized) {
       this.conf = new FairSchedulerConfiguration(conf);
       rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
@@ -909,11 +913,10 @@ public class FairScheduler implements Re
 
       try {
         queueMgr.initialize();
-      }
-      catch (Exception e) {
+      } catch (Exception e) {
         throw new IOException("Failed to start FairScheduler", e);
       }
-      
+
       Thread updateThread = new Thread(new UpdateThread());
       updateThread.setName("FairSchedulerUpdateThread");
       updateThread.setDaemon(true);
@@ -925,10 +928,9 @@ public class FairScheduler implements Re
       rackLocalityThreshold = this.conf.getLocalityThresholdRack();
       preemptionEnabled = this.conf.getPreemptionEnabled();
       try {
-       queueMgr.reloadAllocs();
+        queueMgr.reloadAllocs();
 
-      }
-      catch (Exception e) {
+      } catch (Exception e) {
         throw new IOException("Failed to initialize FairScheduler", e);
       }
     }
@@ -940,8 +942,8 @@ public class FairScheduler implements Re
     if (!queueMgr.exists(queueName)) {
       return null;
     }
-    return queueMgr.getQueue(queueName).getQueueSchedulable().getQueueInfo(
-        includeChildQueues, recursive);
+    return queueMgr.getQueue(queueName).getQueueInfo(includeChildQueues,
+        recursive);
   }
 
   @Override
@@ -953,12 +955,7 @@ public class FairScheduler implements Re
       return new ArrayList<QueueUserACLInfo>();
     }
 
-    List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
-
-    for (FSQueue queue : queueMgr.getQueues()) {
-      userAcls.addAll(queue.getQueueSchedulable().getQueueUserAclInfo(user));
-    }
-    return userAcls;
+    return queueMgr.getRootQueue().getQueueUserAclInfo(user);
   }
 
   @Override

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1415592&r1=1415591&r2=1415592&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Fri Nov 30 12:03:25 2012
@@ -27,6 +27,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
@@ -52,6 +53,7 @@ import org.xml.sax.SAXException;
 /**
  * Maintains a list of queues as well as scheduling parameters for each queue,
  * such as guaranteed share allocations, from the fair scheduler config file.
+ * 
  */
 @Private
 @Unstable
@@ -59,6 +61,8 @@ public class QueueManager {
   public static final Log LOG = LogFactory.getLog(
     QueueManager.class.getName());
 
+  public static final String ROOT_QUEUE = "root";
+  
   /** Time to wait between checks of the allocation file */
   public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000;
 
@@ -76,7 +80,10 @@ public class QueueManager {
                             // used) or a String to specify an absolute path (if
                             // mapred.fairscheduler.allocation.file is used).
 
-  private Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
+  private final Collection<FSLeafQueue> leafQueues = 
+      new CopyOnWriteArrayList<FSLeafQueue>();
+  private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
+  private FSParentQueue rootQueue;
 
   private volatile QueueManagerInfo info = new QueueManagerInfo();
   
@@ -87,10 +94,17 @@ public class QueueManager {
   public QueueManager(FairScheduler scheduler) {
     this.scheduler = scheduler;
   }
+  
+  public FSParentQueue getRootQueue() {
+    return rootQueue;
+  }
 
   public void initialize() throws IOException, SAXException,
       AllocationConfigurationException, ParserConfigurationException {
     FairSchedulerConfiguration conf = scheduler.getConf();
+    rootQueue = new FSParentQueue("root", this, scheduler, null);
+    queues.put(rootQueue.getName(), rootQueue);
+    
     this.allocFile = conf.getAllocationFile();
     if (allocFile == null) {
       // No allocation file specified in jobconf. Use the default allocation
@@ -106,21 +120,106 @@ public class QueueManager {
     lastSuccessfulReload = scheduler.getClock().getTime();
     lastReloadAttempt = scheduler.getClock().getTime();
     // Create the default queue
-    getQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+    getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
   }
-
+  
   /**
-   * Get a queue by name, creating it if necessary
-   */
-  public FSQueue getQueue(String name) {
+   * Get a queue by name, creating it if necessary.  If the queue
+   * is not or can not be a leaf queue, i.e. it already exists as a parent queue,
+   * or one of the parents in its name is already a leaf queue, null is returned.
+   * 
+   * The root part of the name is optional, so a queue underneath the root 
+   * named "queue1" could be referred to  as just "queue1", and a queue named
+   * "queue2" underneath a parent named "parent1" that is underneath the root 
+   * could be referred to as just "parent1.queue2".
+   */
+  public FSLeafQueue getLeafQueue(String name) {
+    if (!name.startsWith(ROOT_QUEUE + ".")) {
+      name = ROOT_QUEUE + "." + name;
+    }
     synchronized (queues) {
       FSQueue queue = queues.get(name);
       if (queue == null) {
-        queue = new FSQueue(scheduler, name);
-        queue.setSchedulingMode(info.defaultSchedulingMode);
-        queues.put(name, queue);
+        FSLeafQueue leafQueue = createLeafQueue(name);
+        if (leafQueue == null) {
+          return null;
+        }
+        leafQueue.setSchedulingMode(info.defaultSchedulingMode);
+        queue = leafQueue;
+      } else if (queue instanceof FSParentQueue) {
+        return null;
+      }
+      return (FSLeafQueue)queue;
+    }
+  }
+  
+  /**
+   * Creates a leaf queue and places it in the tree. Creates any
+   * parents that don't already exist.
+   * 
+   * @return
+   *    the created queue, if successful. null if not allowed (one of the parent
+   *    queues in the queue name is already a leaf queue)
+   */
+  private FSLeafQueue createLeafQueue(String name) {
+    List<String> newQueueNames = new ArrayList<String>();
+    newQueueNames.add(name);
+    int sepIndex = name.length();
+    FSParentQueue parent = null;
+
+    // Move up the queue tree until we reach one that exists.
+    while (sepIndex != -1) {
+      sepIndex = name.lastIndexOf('.', sepIndex-1);
+      FSQueue queue;
+      String curName = null;
+      curName = name.substring(0, sepIndex);
+      queue = queues.get(curName);
+
+      if (queue == null) {
+        newQueueNames.add(curName);
+      } else {
+        if (queue instanceof FSParentQueue) {
+          parent = (FSParentQueue)queue;
+          break;
+        } else {
+          return null;
+        }
       }
-      return queue;
+    }
+    
+    // At this point, parent refers to the deepest existing parent of the
+    // queue to create.
+    // Now that we know everything worked out, make all the queues
+    // and add them to the map.
+    FSLeafQueue leafQueue = null;
+    for (int i = newQueueNames.size()-1; i >= 0; i--) {
+      String queueName = newQueueNames.get(i);
+      if (i == 0) {
+        // First name added was the leaf queue
+        leafQueue = new FSLeafQueue(name, this, scheduler, parent);
+        parent.addChildQueue(leafQueue);
+        queues.put(leafQueue.getName(), leafQueue);
+        leafQueues.add(leafQueue);
+      } else {
+        FSParentQueue newParent = new FSParentQueue(queueName, this, scheduler, parent);
+        parent.addChildQueue(newParent);
+        queues.put(newParent.getName(), newParent);
+        parent = newParent;
+      }
+    }
+    
+    return leafQueue;
+  }
+
+  /**
+   * Gets a queue by name.
+   */
+  public FSQueue getQueue(String name) {
+    if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
+      name = ROOT_QUEUE + "." + name;
+    }
+    synchronized (queues) {
+      return queues.get(name);
     }
   }
 
@@ -136,8 +235,8 @@ public class QueueManager {
   /**
    * Get the queue for a given AppSchedulable.
    */
-  public FSQueue getQueueForApp(AppSchedulable app) {
-    return getQueue(app.getApp().getQueueName());
+  public FSLeafQueue getQueueForApp(AppSchedulable app) {
+    return getLeafQueue(app.getApp().getQueueName());
   }
 
   /**
@@ -237,54 +336,9 @@ public class QueueManager {
       Element element = (Element)node;
       if ("queue".equals(element.getTagName()) ||
     	  "pool".equals(element.getTagName())) {
-        String queueName = element.getAttribute("name");
-        Map<QueueACL, AccessControlList> acls =
-            new HashMap<QueueACL, AccessControlList>();
-        queueNamesInAllocFile.add(queueName);
-        NodeList fields = element.getChildNodes();
-        for (int j = 0; j < fields.getLength(); j++) {
-          Node fieldNode = fields.item(j);
-          if (!(fieldNode instanceof Element))
-            continue;
-          Element field = (Element) fieldNode;
-          if ("minResources".equals(field.getTagName())) {
-            String text = ((Text)field.getFirstChild()).getData().trim();
-            int val = Integer.parseInt(text);
-            minQueueResources.put(queueName, Resources.createResource(val));
-          } else if ("maxResources".equals(field.getTagName())) {
-            String text = ((Text)field.getFirstChild()).getData().trim();
-            int val = Integer.parseInt(text);
-            maxQueueResources.put(queueName, Resources.createResource(val));
-          } else if ("maxRunningApps".equals(field.getTagName())) {
-            String text = ((Text)field.getFirstChild()).getData().trim();
-            int val = Integer.parseInt(text);
-            queueMaxApps.put(queueName, val);
-          } else if ("weight".equals(field.getTagName())) {
-            String text = ((Text)field.getFirstChild()).getData().trim();
-            double val = Double.parseDouble(text);
-            queueWeights.put(queueName, val);
-          } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
-            String text = ((Text)field.getFirstChild()).getData().trim();
-            long val = Long.parseLong(text) * 1000L;
-            minSharePreemptionTimeouts.put(queueName, val);
-          } else if ("schedulingMode".equals(field.getTagName())) {
-            String text = ((Text)field.getFirstChild()).getData().trim();
-            queueModes.put(queueName, parseSchedulingMode(text));
-          } else if ("aclSubmitApps".equals(field.getTagName())) {
-            String text = ((Text)field.getFirstChild()).getData().trim();
-            acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
-          } else if ("aclAdministerApps".equals(field.getTagName())) {
-            String text = ((Text)field.getFirstChild()).getData().trim();
-            acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
-          }
-        }
-        queueAcls.put(queueName, acls);
-        if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
-            && Resources.lessThan(maxQueueResources.get(queueName),
-                minQueueResources.get(queueName))) {
-          LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
-              queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
-        }
+        loadQueue("root", element, minQueueResources, maxQueueResources, queueMaxApps,
+            userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
+            queueAcls, queueNamesInAllocFile);
       } else if ("user".equals(element.getTagName())) {
         String userName = element.getAttribute("name");
         NodeList fields = element.getChildNodes();
@@ -331,7 +385,7 @@ public class QueueManager {
           queueMaxAppsDefault, defaultSchedulingMode, minSharePreemptionTimeouts,
           queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
       for (String name: queueNamesInAllocFile) {
-        FSQueue queue = getQueue(name);
+        FSLeafQueue queue = getLeafQueue(name);
         if (queueModes.containsKey(name)) {
           queue.setSchedulingMode(queueModes.get(name));
         } else {
@@ -340,6 +394,75 @@ public class QueueManager {
       }
     }
   }
+  
+  /**
+   * Loads a queue from a queue element in the configuration file
+   */
+  private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
+      Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
+      Map<String, Integer> userMaxApps, Map<String, Double> queueWeights,
+      Map<String, SchedulingMode> queueModes, Map<String, Long> minSharePreemptionTimeouts,
+      Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile) 
+      throws AllocationConfigurationException {
+    String queueName = parentName + "." + element.getAttribute("name");
+    Map<QueueACL, AccessControlList> acls =
+        new HashMap<QueueACL, AccessControlList>();
+    NodeList fields = element.getChildNodes();
+    boolean isLeaf = true;
+
+    for (int j = 0; j < fields.getLength(); j++) {
+      Node fieldNode = fields.item(j);
+      if (!(fieldNode instanceof Element))
+        continue;
+      Element field = (Element) fieldNode;
+      if ("minResources".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        int val = Integer.parseInt(text);
+        minQueueResources.put(queueName, Resources.createResource(val));
+      } else if ("maxResources".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        int val = Integer.parseInt(text);
+        maxQueueResources.put(queueName, Resources.createResource(val));
+      } else if ("maxRunningApps".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        int val = Integer.parseInt(text);
+        queueMaxApps.put(queueName, val);
+      } else if ("weight".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        double val = Double.parseDouble(text);
+        queueWeights.put(queueName, val);
+      } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        long val = Long.parseLong(text) * 1000L;
+        minSharePreemptionTimeouts.put(queueName, val);
+      } else if ("schedulingMode".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        queueModes.put(queueName, parseSchedulingMode(text));
+      } else if ("aclSubmitApps".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
+      } else if ("aclAdministerApps".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
+      } else if ("queue".endsWith(field.getTagName()) || 
+          "pool".equals(field.getTagName())) {
+        loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps,
+            userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
+            queueAcls, queueNamesInAllocFile);
+        isLeaf = false;
+      }
+    }
+    if (isLeaf) {
+      queueNamesInAllocFile.add(queueName);
+    }
+    queueAcls.put(queueName, acls);
+    if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
+        && Resources.lessThan(maxQueueResources.get(queueName),
+            minQueueResources.get(queueName))) {
+      LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
+          queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
+    }
+  }
 
   private SchedulingMode parseSchedulingMode(String text)
       throws AllocationConfigurationException {
@@ -384,9 +507,9 @@ public class QueueManager {
   /**
    * Get a collection of all queues
    */
-  public Collection<FSQueue> getQueues() {
+  public Collection<FSLeafQueue> getLeafQueues() {
     synchronized (queues) {
-      return new ArrayList<FSQueue>(queues.values());
+      return leafQueues;
     }
   }
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1415592&r1=1415591&r2=1415592&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Fri Nov 30 12:03:25 2012
@@ -92,12 +92,6 @@ abstract class Schedulable {
   public abstract void updateDemand();
 
   /**
-   * Distribute the fair share assigned to this Schedulable among its
-   * children (used in queues where the internal scheduler is fair sharing).
-   */
-  public abstract void redistributeShare();
-
-  /**
    * Assign a container on this node if possible, and return the amount of
    * resources assigned. If {@code reserved} is true, it means a reservation
    * already exists on this node, and the schedulable should fulfill that

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java?rev=1415592&r1=1415591&r2=1415592&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java Fri Nov 30 12:03:25 2012
@@ -23,7 +23,7 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 
 public class FairSchedulerInfo {
@@ -32,9 +32,9 @@ public class FairSchedulerInfo {
   
   public FairSchedulerInfo(FairScheduler fs) {
     scheduler = fs;
-    Collection<FSQueue> queues = fs.getQueueManager().getQueues();
+    Collection<FSLeafQueue> queues = fs.getQueueManager().getLeafQueues();
     queueInfos = new ArrayList<FairSchedulerQueueInfo>();
-    for (FSQueue queue : queues) {
+    for (FSLeafQueue queue : queues) {
       queueInfos.add(new FairSchedulerQueueInfo(queue, fs));
     }
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java?rev=1415592&r1=1415591&r2=1415592&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java Fri Nov 30 12:03:25 2012
@@ -22,9 +22,8 @@ import java.util.Collection;
 
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueueSchedulable;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AppSchedulable;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueueManager;
 
@@ -49,17 +48,16 @@ public class FairSchedulerQueueInfo {
   
   private String queueName;
   
-  public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) {
-    Collection<FSSchedulerApp> apps = queue.getApplications();
-    for (FSSchedulerApp app : apps) {
-      if (app.isPending()) {
+  public FairSchedulerQueueInfo(FSLeafQueue queue, FairScheduler scheduler) {
+    Collection<AppSchedulable> apps = queue.getAppSchedulables();
+    for (AppSchedulable app : apps) {
+      if (app.getApp().isPending()) {
         numPendingApps++;
       } else {
         numActiveApps++;
       }
     }
     
-    FSQueueSchedulable schedulable = queue.getQueueSchedulable();
     QueueManager manager = scheduler.getQueueManager();
     
     queueName = queue.getName();
@@ -67,11 +65,11 @@ public class FairSchedulerQueueInfo {
     Resource clusterMax = scheduler.getClusterCapacity();
     clusterMaxMem = clusterMax.getMemory();
     
-    usedResources = schedulable.getResourceUsage();
+    usedResources = queue.getResourceUsage();
     fractionUsed = (float)usedResources.getMemory() / clusterMaxMem;
     
-    fairShare = schedulable.getFairShare().getMemory();
-    minResources = schedulable.getMinShare();
+    fairShare = queue.getFairShare().getMemory();
+    minResources = queue.getMinShare();
     minShare = minResources.getMemory();
     maxResources = scheduler.getQueueManager().getMaxResources(queueName);
     if (maxResources.getMemory() > clusterMaxMem) {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java?rev=1415592&r1=1415591&r2=1415592&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java Fri Nov 30 12:03:25 2012
@@ -108,8 +108,5 @@ public class FakeSchedulable extends Sch
   }
 
   @Override
-  public void redistributeShare() {}
-
-  @Override
   public void updateDemand() {}
 }

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java?rev=1415592&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java Fri Nov 30 12:03:25 2012
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestFSLeafQueue {
+  private FSLeafQueue schedulable = null;
+  private Resource maxResource = Resources.createResource(10);
+
+  @Before
+  public void setup() throws IOException {
+    FairScheduler scheduler = new FairScheduler();
+    Configuration conf = createConfiguration();
+    // All tests assume only one assignment per node update
+    conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
+    RMStateStore store = StoreFactory.getStore(conf);
+    ResourceManager resourceManager = new ResourceManager(store);
+    resourceManager.init(conf);
+    ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+    
+    String queueName = "root.queue1";
+    QueueManager mockMgr = mock(QueueManager.class);
+    when(mockMgr.getMaxResources(queueName)).thenReturn(maxResource);
+
+    schedulable = new FSLeafQueue(queueName, mockMgr, scheduler, null);
+  }
+
+  @Test
+  public void testUpdateDemand() {
+    AppSchedulable app = mock(AppSchedulable.class);
+    Mockito.when(app.getDemand()).thenReturn(maxResource);
+
+    schedulable.addAppSchedulable(app);
+    schedulable.addAppSchedulable(app);
+
+    schedulable.updateDemand();
+
+    assertTrue("Demand is greater than max allowed ",
+        Resources.equals(schedulable.getDemand(), maxResource));
+  }
+  
+  private Configuration createConfiguration() {
+    Configuration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
+        ResourceScheduler.class);
+    return conf;
+  }
+}

Propchange: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message