hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r808308 [1/5] - in /hadoop/mapreduce/trunk: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
Date Thu, 27 Aug 2009 07:49:01 GMT
Author: yhemanth
Date: Thu Aug 27 07:49:00 2009
New Revision: 808308

URL: http://svn.apache.org/viewvc?rev=808308&view=rev
Log:
MAPREDUCE-824. Add support for a hierarchy of queues in the capacity scheduler. Contributed by Rahul Kumar Singh.

Added:
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/AbstractQueue.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ContainerQueue.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueue.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/TaskDataView.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/TaskSchedulingContext.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/conf/capacity-scheduler.xml.template
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=808308&r1=808307&r2=808308&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Aug 27 07:49:00 2009
@@ -78,6 +78,9 @@
     MAPREDUCE-768. Provide an option to dump jobtracker configuration in JSON
     format to standard output. (V.V.Chaitanya Krishna via yhemanth)
 
+    MAPREDUCE-824. Add support for a hierarchy of queues in the capacity 
+    scheduler. (Rahul Kumar Singh via yhemanth)
+
   IMPROVEMENTS
 
     MAPREDUCE-816. Rename "local" mysql import to "direct" in Sqoop.

Modified: hadoop/mapreduce/trunk/conf/capacity-scheduler.xml.template
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/conf/capacity-scheduler.xml.template?rev=808308&r1=808307&r2=808308&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/conf/capacity-scheduler.xml.template (original)
+++ hadoop/mapreduce/trunk/conf/capacity-scheduler.xml.template Thu Aug 27 07:49:00 2009
@@ -14,6 +14,29 @@
       to be available for jobs in this queue.
     </description>    
   </property>
+
+  <property>
+    <name>mapred.capacity-scheduler.queue.default.subQueues</name>
+    <value></value>
+    <description>Sub-queues are queues configured within queues. 
+       They provide a mechanism for administrators to link logically related queues
+       Sub-queues can be nested. So there can be queues within a sub-queue.
+    </description>    
+  </property>
+
+  <property>
+    <name>mapred.capacity-scheduler.queue.default.maximum-capacity</name>
+    <value>-1</value>
+    <description>
+	maximum-capacity-stretch defines a limit beyond which a sub-queue cannot use the capacity of its parent queue.
+	This provides a means to limit how much excess capacity a sub-queue can use. By default, there is no limit.
+	The maximum-capacity-stretch of a queue can only be greater than or equal to its minimum capacity.
+        Default value of 100 implies , sub-queue can use complete capacity of its parent.
+        This property could be to curtail certain jobs which are long running in nature from occupying more than a 
+        certain percentage of the cluster, which in the absence of pre-emption, could lead to capacity guarantees of 
+        other queues being affected.
+    </description>    
+  </property>
   
   <property>
     <name>mapred.capacity-scheduler.queue.default.supports-priority</name>

Added: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/AbstractQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/AbstractQueue.java?rev=808308&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/AbstractQueue.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/AbstractQueue.java Thu Aug 27 07:49:00 2009
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Comparator;
+import java.util.List;
+
+
+/**
+ * Parent class for hierarchy of queues.
+ * All queues extend this class.
+ * <p/>
+ * Even though all the Queue classes extend this class , there are 2 categories
+ * of queues define.
+ * <p/>
+ * 1.ContainerQueue: which are composite of queues.
+ * 2.JobQueue: leaf level queues.
+ * <p/>
+ * Typically ContainerQueue consists of JobQueue. All the SchedulingContext data
+ * in ContainerQueue is cummulative of its children.
+ * <p/>
+ * JobQueue consists of actual job list , i.e, runningJob, WaitingJob etc.
+ * <p/>
+ * This is done so to make sure that all the job related data is at one place
+ * and queues at higher level are typically cummulative data of organization at
+ * there children level.
+ */
+
+public abstract class AbstractQueue {
+
+  static final Log LOG = LogFactory.getLog(AbstractQueue.class);
+
+  protected QueueSchedulingContext qsc;
+  protected AbstractQueue parent;
+
+
+  protected AbstractQueue(AbstractQueue parent, QueueSchedulingContext qsc) {
+    this.parent = parent;
+    this.qsc = qsc;
+    //Incase of root this value would be null
+    if (parent != null) {
+      parent.addChild(this);
+    }
+  }
+
+  /**
+   * This involves updating each qC structure.
+   * <p/>
+   * First update QueueSchedulingContext at this level is updated.
+   * then update QueueSchedulingContext of all the children.
+   * <p/>
+   * Children consider parent's capacity as the totalclustercapacity
+   * and do there calculations accordingly.
+   *
+   * @param mapClusterCapacity
+   * @param reduceClusterCapacity
+   */
+
+  public void update(int mapClusterCapacity, int reduceClusterCapacity) {
+    qsc.updateContext(mapClusterCapacity,reduceClusterCapacity);
+  }
+
+  /**
+   * @return qsc
+   */
+  public QueueSchedulingContext getQueueSchedulingContext() {
+    return qsc;
+  }
+
+  String getName() {
+    return qsc.getQueueName();
+  }
+
+  protected AbstractQueue getParent() {
+    return parent;
+  }
+
+  protected void setParent(AbstractQueue queue) {
+    this.parent = queue;
+  }
+
+  /**
+   * Return sorted list of leaf level queues.
+   *
+   * @return
+   */
+  public abstract List<AbstractQueue> getDescendentJobQueues();
+
+  /**
+   * Sorts all levels below current level.
+   *
+   * @param queueComparator
+   */
+  public abstract void sort(Comparator queueComparator);
+
+  /**
+   * returns list of immediate children.
+   * null in case of leaf.
+   *
+   * @return
+   */
+  abstract List<AbstractQueue> getChildren();
+
+  /**
+   * adds children to the current level.
+   * There is no support for adding children at leaf level node.
+   *
+   * @param queue
+   */
+  public abstract void addChild(AbstractQueue queue);
+
+  /**
+   * Distribute the unconfigured capacity % among the queues.
+   *
+   */
+  abstract void distributeUnConfiguredCapacity();
+
+  @Override
+  public String toString() {
+    return this.getName().toString() 
+            + "\n" + getQueueSchedulingContext().toString();
+  }
+
+  @Override
+  /**
+   * Returns true, if the other object is an AbstractQueue
+   * with the same name.
+   */
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (!(other instanceof AbstractQueue)) {
+      return false;
+    }
+    
+    AbstractQueue otherQueue = (AbstractQueue)other;
+    return otherQueue.getName().equals(getName());
+  }
+  
+  @Override
+  public int hashCode() {
+    return this.getName().hashCode();
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=808308&r1=808307&r2=808308&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Thu Aug 27 07:49:00 2009
@@ -19,19 +19,27 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.HashSet;
+import java.util.Set;
 
 /**
- * Class providing access to resource manager configuration.
+ * Class providing access to Capacity scheduler configuration.
  * 
- * Resource manager configuration involves setting up queues, and defining
+ * Capacity scheduler configuration involves setting up queues, and defining
  * various properties for the queues. These are typically read from a file 
  * called capacity-scheduler.xml that must be in the classpath of the
  * application. The class provides APIs to get/set and reload the 
  * configuration for the queues.
  */
 class CapacitySchedulerConf {
+
+  static final Log LOG = LogFactory.getLog(CapacitySchedulerConf.class);
+
   
-  /** Default file name from which the resource manager configuration is read. */ 
+  /** Default file name from which the capacity scheduler configuration is read. */
   public static final String SCHEDULER_CONF_FILE = "capacity-scheduler.xml";
   
   private int defaultUlimitMinimum;
@@ -40,6 +48,8 @@
   
   private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX = 
     "mapred.capacity-scheduler.queue.";
+  
+  private static final String SUBQUEUE_SUFFIX="subQueues";
 
   /**
    * If {@link JobConf#MAPRED_TASK_MAXPMEM_PROPERTY} is set to
@@ -74,7 +84,12 @@
   @Deprecated
   static final String UPPER_LIMIT_ON_TASK_PMEM_PROPERTY =
     "mapred.capacity-scheduler.task.limit.maxpmem";
-
+  
+  /**
+   * A maximum capacity defines a limit beyond which a sub-queue
+   * cannot use the capacity of its parent queue.
+   */
+  static final String MAX_CAPACITY ="maximum-capacity";
   /**
    *  Configuration that provides the maximum cap for the map task in a queue
    *  at any given point of time.
@@ -102,29 +117,29 @@
   private Configuration rmConf;
 
   private int defaultMaxJobsPerUsersToInitialize;
-  
+
   /**
-   * Create a new ResourceManagerConf.
+   * Create a new CapacitySchedulerConf.
    * This method reads from the default configuration file mentioned in
-   * {@link RM_CONF_FILE}, that must be present in the classpath of the
+   * {@link SCHEDULER_CONF_FILE}, that must be present in the classpath of the
    * application.
    */
   public CapacitySchedulerConf() {
     rmConf = new Configuration(false);
-    rmConf.addResource(SCHEDULER_CONF_FILE);
+    getCSConf().addResource(SCHEDULER_CONF_FILE);
     initializeDefaults();
   }
 
   /**
-   * Create a new ResourceManagerConf reading the specified configuration
+   * Create a new CapacitySchedulerConf reading the specified configuration
    * file.
    * 
    * @param configFile {@link Path} to the configuration file containing
-   * the resource manager configuration.
+   * the Capacity scheduler configuration.
    */
   public CapacitySchedulerConf(Path configFile) {
     rmConf = new Configuration(false);
-    rmConf.addResource(configFile);
+    getCSConf().addResource(configFile);
     initializeDefaults();
   }
   
@@ -133,11 +148,11 @@
    * which is used by the Capacity Scheduler.
    */
   private void initializeDefaults() {
-    defaultUlimitMinimum = rmConf.getInt(
+    defaultUlimitMinimum = getCSConf().getInt(
         "mapred.capacity-scheduler.default-minimum-user-limit-percent", 100);
-    defaultSupportPriority = rmConf.getBoolean(
+    defaultSupportPriority = getCSConf().getBoolean(
         "mapred.capacity-scheduler.default-supports-priority", false);
-    defaultMaxJobsPerUsersToInitialize = rmConf.getInt(
+    defaultMaxJobsPerUsersToInitialize = getCSConf().getInt(
         "mapred.capacity-scheduler.default-maximum-initialized-jobs-per-user",
         2);
   }
@@ -163,12 +178,12 @@
     //In case of both capacity and default capacity not configured.
     //Last check is if the configuration is specified and is marked as
     //negative we throw exception
-    String raw = rmConf.getRaw(toFullPropertyName(queue, 
+    String raw = getCSConf().getRaw(toFullPropertyName(queue,
         "capacity"));
     if(raw == null) {
       return -1;
     }
-    float result = rmConf.getFloat(toFullPropertyName(queue, 
+    float result = getCSConf().getFloat(toFullPropertyName(queue,
                                    "capacity"), 
                                    -1);
     if (result < 0.0 || result > 100.0) {
@@ -177,6 +192,62 @@
     }
     return result;
   }
+
+  /**
+   * Get maximum percentage stretch for a queue.
+   * This percentage defines a limit beyond which a
+   * sub-queue cannot use the capacity of its parent queue.
+   * This provides a means to limit how much excess capacity a
+   * sub-queue can use. By default, there is no limit.
+   *
+   * The maximum-capacity-stretch of a queue can only be
+   * greater than or equal to its minimum capacity.
+   * 
+   * @param queue
+   * @return
+   */
+  public float getMaxCapacity(String queue) {
+    String raw = getCSConf().getRaw(toFullPropertyName(queue, MAX_CAPACITY));
+    if(raw == null) {
+      return -1;
+    }
+    float result = getCSConf().getFloat(toFullPropertyName(queue,
+                                                           MAX_CAPACITY),
+                                   -1);
+    result = (result <= 0) ? -1 : result; 
+    if (result > 100.0) {
+      throw new IllegalArgumentException("Illegal maximum-capacity-stretch " +
+        "for queue " + queue +" of " + result);
+    }
+
+    if((result != -1) && (result < getCapacity(queue))) {
+      throw new IllegalArgumentException("maximum-capacity-stretch " +
+        "for a queue should be greater than capacity ");
+    }
+    return result;
+  }
+
+  /**
+   *
+   * @param queue . Complete qualified name of the queue
+   * All the subQueues at same level should have unique names.
+   * 
+   * @return list of subQueues at a level.
+   * @return null if no subQueues specified.
+   */
+  public Set<String> getSubQueues(String queue){
+    String[] result = getCSConf().getStrings(
+      toFullPropertyName(
+        queue,SUBQUEUE_SUFFIX));
+    if(result != null && result.length  > 0){
+      HashSet<String> qs = new HashSet<String>(result.length);
+      for (String q: result){
+        qs.add(q);
+      }
+      return qs;
+    }
+    return null;
+  }
   
   /**
    * Sets the capacity of the given queue.
@@ -184,8 +255,18 @@
    * @param queue name of the queue
    * @param capacity percent of the cluster for the queue.
    */
+  public void setMaxCapacity(String queue,float capacity) {
+    getCSConf().setFloat(toFullPropertyName(queue, MAX_CAPACITY),capacity);
+  }
+
+  /**
+   * Sets the capacity of the given queue.
+   *
+   * @param queue name of the queue
+   * @param capacity percent of the cluster for the queue.
+   */
   public void setCapacity(String queue,float capacity) {
-    rmConf.setFloat(toFullPropertyName(queue, "capacity"),capacity);
+    getCSConf().setFloat(toFullPropertyName(queue, "capacity"),capacity);
   }
   
   /**
@@ -198,7 +279,8 @@
    * @return Whether this queue supports priority or not.
    */
   public boolean isPrioritySupported(String queue) {
-    return rmConf.getBoolean(toFullPropertyName(queue, "supports-priority"),
+    checkIfJobQueue(queue);
+    return getCSConf().getBoolean(toFullPropertyName(queue, "supports-priority"),
         defaultSupportPriority);  
   }
   
@@ -210,7 +292,8 @@
    * @param value true, if the queue must support priorities, false otherwise.
    */
   public void setPrioritySupported(String queue, boolean value) {
-    rmConf.setBoolean(toFullPropertyName(queue, "supports-priority"), value);
+    checkIfJobQueue(queue);
+    getCSConf().setBoolean(toFullPropertyName(queue, "supports-priority"), value);
   }
   
   /**
@@ -229,7 +312,8 @@
    * 
    */
   public int getMinimumUserLimitPercent(String queue) {
-    int userLimit = rmConf.getInt(toFullPropertyName(queue,
+    checkIfJobQueue(queue);
+    int userLimit = getCSConf().getInt(toFullPropertyName(queue,
         "minimum-user-limit-percent"), defaultUlimitMinimum);
     if(userLimit <= 0 || userLimit > 100) {
       throw new IllegalArgumentException("Invalid user limit : "
@@ -237,7 +321,17 @@
     }
     return userLimit;
   }
-  
+
+  private void checkIfJobQueue(String queue) {
+    Set<String> queues = getSubQueues(queue);
+    if(queues == null || queues.isEmpty()) {
+      return;
+    } else {
+      LOG.warn("This property is " +
+        "only allowed for child queue not for container Queue " + queue);
+    }
+  }
+
   /**
    * Set the minimum limit of resources for any user submitting jobs in
    * this queue, in percentage.
@@ -247,7 +341,7 @@
    * in this queue
    */
   public void setMinimumUserLimitPercent(String queue, int value) {
-    rmConf.setInt(toFullPropertyName(queue, "minimum-user-limit-percent"), 
+    getCSConf().setInt(toFullPropertyName(queue, "minimum-user-limit-percent"),
                     value);
   }
   
@@ -256,7 +350,7 @@
    * underlying configuration file.
    */
   public synchronized void reloadConfiguration() {
-    rmConf.reloadConfiguration();
+    getCSConf().reloadConfiguration();
     initializeDefaults();
   }
   
@@ -275,7 +369,8 @@
    * or zero.
    */
   public int getMaxJobsPerUserToInitialize(String queue) {
-    int maxJobsPerUser = rmConf.getInt(toFullPropertyName(queue,
+    checkIfJobQueue(queue);
+    int maxJobsPerUser = getCSConf().getInt(toFullPropertyName(queue,
         "maximum-initialized-jobs-per-user"), 
         defaultMaxJobsPerUsersToInitialize);
     if(maxJobsPerUser <= 0) {
@@ -293,7 +388,7 @@
    * @param value maximum number of jobs allowed to be initialized per user.
    */
   public void setMaxJobsPerUserToInitialize(String queue, int value) {
-    rmConf.setInt(toFullPropertyName(queue, 
+    getCSConf().setInt(toFullPropertyName(queue,
         "maximum-initialized-jobs-per-user"), value);
   }
 
@@ -308,7 +403,7 @@
    * @throws IllegalArgumentException if time is negative or zero.
    */
   public long getSleepInterval() {
-    long sleepInterval = rmConf.getLong(
+    long sleepInterval = getCSConf().getLong(
         "mapred.capacity-scheduler.init-poll-interval", 
         INITIALIZATION_THREAD_POLLING_INTERVAL);
     
@@ -338,7 +433,7 @@
    * in parallel.
    */
   public int getMaxWorkerThreads() {
-    int maxWorkerThreads = rmConf.getInt(
+    int maxWorkerThreads = getCSConf().getInt(
         "mapred.capacity-scheduler.init-worker-threads", 
         MAX_INITIALIZATION_WORKER_THREADS);
     if(maxWorkerThreads <= 0) {
@@ -354,7 +449,7 @@
    * @param interval sleep interval
    */
   public void setSleepInterval(long interval) {
-    rmConf.setLong(
+    getCSConf().setLong(
         "mapred.capacity-scheduler.init-poll-interval", interval);
   }
   
@@ -366,7 +461,7 @@
    * in parallel.
    */
   public void setMaxWorkerThreads(int poolSize) {
-    rmConf.setInt(
+    getCSConf().setInt(
         "mapred.capacity-scheduler.init-worker-threads", poolSize);
   }
 
@@ -376,7 +471,8 @@
    * @return
    */
   public int getMaxMapCap(String queue) {
-    return rmConf.getInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),-1);
+    checkIfJobQueue(queue);
+    return getCSConf().getInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),-1);
   }
 
   /**
@@ -385,7 +481,8 @@
    * @param val
    */
   public void setMaxMapCap(String queue,int val) {
-    rmConf.setInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),val);
+    checkIfJobQueue(queue);
+    getCSConf().setInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),val);
   }
 
   /**
@@ -394,7 +491,8 @@
    * @return
    */
   public int getMaxReduceCap(String queue) {
-    return rmConf.getInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),-1);    
+    checkIfJobQueue(queue);
+    return getCSConf().getInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),-1);
   }
 
   /**
@@ -403,6 +501,11 @@
    * @param val
    */
   public void setMaxReduceCap(String queue,int val) {
-    rmConf.setInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),val);
+    checkIfJobQueue(queue);
+    getCSConf().setInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),val);
+  }
+
+  public Configuration getCSConf() {
+    return rmConf;
   }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=808308&r1=808307&r2=808308&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Thu Aug 27 07:49:00 2009
@@ -23,7 +23,6 @@
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -32,7 +31,6 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
-import org.apache.hadoop.mapred.TaskTrackerStatus;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
@@ -57,218 +55,18 @@
  *  
  */
 class CapacityTaskScheduler extends TaskScheduler {
-  
-  /***********************************************************************
-   * Keeping track of scheduling information for queues
-   * 
-   * We need to maintain scheduling information relevant to a queue (its 
-   * name, capacity, etc), along with information specific to 
-   * each kind of task, Map or Reduce (num of running tasks, pending 
-   * tasks etc). 
-   * 
-   * This scheduling information is used to decide how to allocate
-   * tasks, redistribute capacity, etc.
-   *  
-   * A QueueSchedulingInfo(QSI) object represents scheduling information for
-   * a queue. A TaskSchedulingInfo (TSI) object represents scheduling 
-   * information for a particular kind of task (Map or Reduce).
-   *   
-   **********************************************************************/
-
-  private static class TaskSchedulingInfo {
-    
-    private static final String LIMIT_NORMALIZED_CAPACITY_STRING
-      = "(Capacity is restricted to max limit of %d slots.\n" +
-        "Remaining %d slots will be used by other queues.)\n";
-    /** 
-     * the actual capacity, which depends on how many slots are available
-     * in the cluster at any given time. 
-     */
-    private int capacity = 0;
-    // number of running tasks
-    int numRunningTasks = 0;
-    // number of slots occupied by running tasks
-    int numSlotsOccupied = 0;
-
-    /**
-     * max task limit
-     * This value is the maximum slots that can be used in a
-     * queue at any point of time. So for example assuming above config value
-     * is 100 , not more than 100 tasks would be in the queue at any point of
-     * time, assuming each task takes one slot.
-     */
-    private int maxTaskLimit = -1;
-
-    /**
-     * for each user, we need to keep track of number of slots occupied by
-     * running tasks
-     */
-    Map<String, Integer> numSlotsOccupiedByUser = 
-      new HashMap<String, Integer>();
-
-    /**
-     * reset the variables associated with tasks
-     */
-    void resetTaskVars() {
-      numRunningTasks = 0;
-      numSlotsOccupied = 0;
-      for (String s: numSlotsOccupiedByUser.keySet()) {
-        numSlotsOccupiedByUser.put(s, Integer.valueOf(0));
-      }
-    }
-
 
-    int getMaxTaskLimit() {
-      return maxTaskLimit;
-    }
-
-    void setMaxTaskLimit(int maxTaskCap) {
-      this.maxTaskLimit = maxTaskCap;
-    }
+  /** quick way to get qsc object given a queue name */
+  private Map<String, QueueSchedulingContext> queueInfoMap =
+    new HashMap<String, QueueSchedulingContext>();
+
+  //Root level queue . It has all the
+  //cluster capacity at its disposal.
+  //Queues declared by users would
+  //be children of this queue.
+  //CS would have handle to root.
+  private AbstractQueue root = null;
 
-    /**
-     * This method checks for maxTaskLimit and sends minimum of maxTaskLimit and
-     * capacity.
-     * @return
-     */
-    int getCapacity() {
-      return ((maxTaskLimit >= 0) && (maxTaskLimit < capacity)) ? maxTaskLimit :
-        capacity;
-    }
-
-    /**
-     * Mutator method for capacity
-     * @param capacity
-     */
-    void setCapacity(int capacity) {
-        this.capacity = capacity;
-    }
-
-
-    /**
-     * return information about the tasks
-     */
-    @Override
-    public String toString() {
-      float occupiedSlotsAsPercent =
-          getCapacity() != 0 ?
-            ((float) numSlotsOccupied * 100 / getCapacity()) : 0;
-      StringBuffer sb = new StringBuffer();
-      
-      sb.append("Capacity: " + capacity + " slots\n");
-      //If maxTaskLimit is less than the capacity
-      if (maxTaskLimit >= 0 && maxTaskLimit < capacity) {
-        sb.append(String.format(LIMIT_NORMALIZED_CAPACITY_STRING,   
-                        maxTaskLimit, (capacity-maxTaskLimit)));
-      }
-      if (maxTaskLimit >= 0) {
-        sb.append(String.format("Maximum Slots Limit: %d\n", maxTaskLimit));
-      }
-      sb.append(String.format("Used capacity: %d (%.1f%% of Capacity)\n",
-          Integer.valueOf(numSlotsOccupied), Float
-              .valueOf(occupiedSlotsAsPercent)));
-      sb.append(String.format("Running tasks: %d\n", Integer
-          .valueOf(numRunningTasks)));
-      // include info on active users
-      if (numSlotsOccupied != 0) {
-        sb.append("Active users:\n");
-        for (Map.Entry<String, Integer> entry : numSlotsOccupiedByUser
-            .entrySet()) {
-          if ((entry.getValue() == null) || (entry.getValue().intValue() <= 0)) {
-            // user has no tasks running
-            continue;
-          }
-          sb.append("User '" + entry.getKey() + "': ");
-          int numSlotsOccupiedByThisUser = entry.getValue().intValue();
-          float p =
-              (float) numSlotsOccupiedByThisUser * 100 / numSlotsOccupied;
-          sb.append(String.format("%d (%.1f%% of used capacity)\n", Long
-              .valueOf(numSlotsOccupiedByThisUser), Float.valueOf(p)));
-        }
-      }
-      return sb.toString();
-    }
-  }
-  
-  private static class QueueSchedulingInfo {
-    String queueName;
-
-    /** capacity(%) is set in the config */ 
-    float capacityPercent = 0;
-    
-    /** 
-     * to handle user limits, we need to know how many users have jobs in 
-     * the queue.
-     */  
-    Map<String, Integer> numJobsByUser = new HashMap<String, Integer>();
-      
-    /** min value of user limit (same for all users) */
-    int ulMin;
-    
-    /**
-     * We keep track of the JobQueuesManager only for reporting purposes 
-     * (in toString()). 
-     */
-    private JobQueuesManager jobQueuesManager;
-    
-    /**
-     * We keep a TaskSchedulingInfo object for each kind of task we support
-     */
-    TaskSchedulingInfo mapTSI;
-    TaskSchedulingInfo reduceTSI;
-    
-    public QueueSchedulingInfo(String queueName, float capacityPercent,
-                               int ulMin, JobQueuesManager jobQueuesManager,
-                               int mapCap, int reduceCap) {
-      this.queueName = new String(queueName);
-      this.capacityPercent = capacityPercent;
-      this.ulMin = ulMin;
-      this.jobQueuesManager = jobQueuesManager;
-      this.mapTSI = new TaskSchedulingInfo();
-      this.reduceTSI = new TaskSchedulingInfo();
-      this.mapTSI.setMaxTaskLimit(mapCap);
-      this.reduceTSI.setMaxTaskLimit(reduceCap);
-    }
-    
-    /**
-     * return information about the queue
-     * @return a String representing the information about the queue.
-     */
-    @Override
-    public String toString(){
-      // We print out the queue information first, followed by info
-      // on map and reduce tasks and job info
-      StringBuffer sb = new StringBuffer();
-      sb.append("Queue configuration\n");
-      sb.append("Capacity Percentage: ");
-      sb.append(capacityPercent);
-      sb.append("%\n");
-      sb.append(String.format("User Limit: %d%s\n",ulMin, "%"));
-      sb.append(String.format("Priority Supported: %s\n",
-          (jobQueuesManager.doesQueueSupportPriorities(queueName))?
-              "YES":"NO"));
-      sb.append("-------------\n");
-
-      sb.append("Map tasks\n");
-      sb.append(mapTSI.toString());
-      sb.append("-------------\n");
-      sb.append("Reduce tasks\n");
-      sb.append(reduceTSI.toString());
-      sb.append("-------------\n");
-      
-      sb.append("Job info\n");
-      sb.append(String.format("Number of Waiting Jobs: %d\n", 
-          jobQueuesManager.getWaitingJobCount(queueName)));
-      sb.append(String.format("Number of users who have submitted jobs: %d\n", 
-          numJobsByUser.size()));
-      return sb.toString();
-    }
-  }
-
-  /** quick way to get qsi object given a queue name */
-  private Map<String, QueueSchedulingInfo> queueInfoMap = 
-    new HashMap<String, QueueSchedulingInfo>();
-  
   /**
    * This class captures scheduling information we want to display or log.
    */
@@ -283,12 +81,12 @@
     
     @Override
     public String toString(){
-      // note that we do not call updateQSIObjects() here for performance
+      // note that we do not call updateContextObjects() here for performance
       // reasons. This means that the data we print out may be slightly
       // stale. This data is updated whenever assignTasks() is called
       // If this doesn't happen, the data gets stale. If we see
       // this often, we may need to detect this situation and call 
-      // updateQSIObjects(), or just call it each time. 
+      // updateContextObjects(), or just call it each time.
       return scheduler.getDisplayInfo(queueName);
     }
   }
@@ -337,11 +135,11 @@
     }
   }
 
-  /** 
-   * This class handles the scheduling algorithms. 
-   * The algos are the same for both Map and Reduce tasks. 
+  /**
+   * This class handles the scheduling algorithms.
+   * The algos are the same for both Map and Reduce tasks.
    * There may be slight variations later, in which case we can make this
-   * an abstract base class and have derived classes for Map and Reduce.  
+   * an abstract base class and have derived classes for Map and Reduce.
    */
   private static abstract class TaskSchedulingMgr {
 
@@ -349,69 +147,40 @@
     protected CapacityTaskScheduler scheduler;
     protected TaskType type = null;
 
-    abstract Task obtainNewTask(TaskTrackerStatus taskTracker, 
+    abstract Task obtainNewTask(TaskTrackerStatus taskTracker,
         JobInProgress job) throws IOException;
 
-    int getSlotsOccupied(JobInProgress job) {
-      return (getNumReservedTaskTrackers(job) + getRunningTasks(job)) * 
-             getSlotsPerTask(job);
-    }
-
     abstract int getClusterCapacity();
-    abstract int getSlotsPerTask(JobInProgress job);
-    abstract int getRunningTasks(JobInProgress job);
-    abstract int getPendingTasks(JobInProgress job);
-    abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
-    abstract int getNumReservedTaskTrackers(JobInProgress job);
-    
+    abstract TaskSchedulingContext getTSC(
+      QueueSchedulingContext qsc);
     /**
      * To check if job has a speculative task on the particular tracker.
-     * 
+     *
      * @param job job to check for speculative tasks.
      * @param tts task tracker on which speculative task would run.
      * @return true if there is a speculative task to run on the tracker.
      */
-    abstract boolean hasSpeculativeTask(JobInProgress job, 
+    abstract boolean hasSpeculativeTask(JobInProgress job,
         TaskTrackerStatus tts);
 
     /**
-     * Check if the given job has sufficient reserved tasktrackers for all its
-     * pending tasks.
-     * 
-     * @param job job to check for sufficient reserved tasktrackers 
-     * @return <code>true</code> if the job has reserved tasktrackers,
-     *         else <code>false</code>
-     */
-    boolean hasSufficientReservedTaskTrackers(JobInProgress job) {
-      return getNumReservedTaskTrackers(job) >= getPendingTasks(job);
-    }
-    
-    /**
-     * List of QSIs for assigning tasks.
-     * Queues are ordered by a ratio of (# of running tasks)/capacity, which
-     * indicates how much 'free space' the queue has, or how much it is over
-     * capacity. This ordered list is iterated over, when assigning tasks.
-     */  
-    private List<QueueSchedulingInfo> qsiForAssigningTasks = 
-      new ArrayList<QueueSchedulingInfo>();
-
-    /**
      * Comparator to sort queues.
-     * For maps, we need to sort on QueueSchedulingInfo.mapTSI. For 
-     * reducers, we use reduceTSI. So we'll need separate comparators.  
-     */ 
-    private static abstract class QueueComparator 
-      implements Comparator<QueueSchedulingInfo> {
-      abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
-      public int compare(QueueSchedulingInfo q1, QueueSchedulingInfo q2) {
-        TaskSchedulingInfo t1 = getTSI(q1);
-        TaskSchedulingInfo t2 = getTSI(q2);
+     * For maps, we need to sort on QueueSchedulingContext.mapTSC. For
+     * reducers, we use reduceTSC. So we'll need separate comparators.
+     */
+    private static abstract class QueueComparator
+      implements Comparator<AbstractQueue> {
+      abstract TaskSchedulingContext getTSC(
+        QueueSchedulingContext qsi);
+      public int compare(AbstractQueue q1, AbstractQueue q2) {
+        TaskSchedulingContext t1 = getTSC(q1.getQueueSchedulingContext());
+        TaskSchedulingContext t2 = getTSC(q2.getQueueSchedulingContext());
         // look at how much capacity they've filled. Treat a queue with
         // capacity=0 equivalent to a queue running at capacity
         double r1 = (0 == t1.getCapacity())? 1.0f:
-          (double)t1.numSlotsOccupied/(double) t1.getCapacity();
+          (double) t1.getNumSlotsOccupied() /(double) t1.getCapacity();
         double r2 = (0 == t2.getCapacity())? 1.0f:
-          (double)t2.numSlotsOccupied/(double) t2.getCapacity();
+          (double) t2.getNumSlotsOccupied() /(double) t2.getCapacity();
         if (r1<r2) return -1;
         else if (r1>r2) return 1;
         else return 0;
@@ -419,67 +188,67 @@
     }
     // subclass for map and reduce comparators
     private static final class MapQueueComparator extends QueueComparator {
-      TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
-        return qsi.mapTSI;
+      TaskSchedulingContext getTSC(QueueSchedulingContext qsi) {
+        return qsi.getMapTSC();
       }
     }
     private static final class ReduceQueueComparator extends QueueComparator {
-      TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
-        return qsi.reduceTSI;
+      TaskSchedulingContext getTSC(QueueSchedulingContext qsi) {
+        return qsi.getReduceTSC();
       }
     }
+
     // these are our comparator instances
-    protected final static MapQueueComparator mapComparator = new MapQueueComparator();
-    protected final static ReduceQueueComparator reduceComparator = new ReduceQueueComparator();
+    protected final static MapQueueComparator mapComparator =
+      new MapQueueComparator();
+    protected final static ReduceQueueComparator reduceComparator =
+      new ReduceQueueComparator();
     // and this is the comparator to use
     protected QueueComparator queueComparator;
 
     // Returns queues sorted according to the QueueComparator.
     // Mainly for testing purposes.
     String[] getOrderedQueues() {
-      List<String> queues = new ArrayList<String>(qsiForAssigningTasks.size());
-      for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
-        queues.add(qsi.queueName);
+      List<AbstractQueue> queueList = getOrderedJobQueues();
+      List<String> queues = new ArrayList<String>(queueList.size());
+      for (AbstractQueue q : queueList) {
+        queues.add(q.getName());
       }
       return queues.toArray(new String[queues.size()]);
     }
 
+    List<AbstractQueue> getOrderedJobQueues() {
+      scheduler.root.sort(queueComparator);
+      return scheduler.root.getDescendentJobQueues();
+    }
+
     TaskSchedulingMgr(CapacityTaskScheduler sched) {
       scheduler = sched;
     }
-    
-    // let the scheduling mgr know which queues are in the system
-    void initialize(Map<String, QueueSchedulingInfo> qsiMap) { 
-      // add all the qsi objects to our list and sort
-      qsiForAssigningTasks.addAll(qsiMap.values());
-      Collections.sort(qsiForAssigningTasks, queueComparator);
-    }
-    
-    private synchronized void updateCollectionOfQSIs() {
-      Collections.sort(qsiForAssigningTasks, queueComparator);
-    }
-
 
-    private boolean isUserOverLimit(JobInProgress j, QueueSchedulingInfo qsi) {
+    private boolean isUserOverLimit(JobInProgress j,
+                                    QueueSchedulingContext qsc) {
       // what is our current capacity? It is equal to the queue-capacity if
       // we're running below capacity. If we're running over capacity, then its
       // #running plus slotPerTask of the job (which is the number of extra
       // slots we're getting).
       int currentCapacity;
-      TaskSchedulingInfo tsi = getTSI(qsi);
-      if (tsi.numSlotsOccupied < tsi.getCapacity()) {
+      TaskSchedulingContext tsi = getTSC(qsc);
+      if (tsi.getNumSlotsOccupied() < tsi.getCapacity()) {
         currentCapacity = tsi.getCapacity();
       }
       else {
-        currentCapacity = tsi.numSlotsOccupied + getSlotsPerTask(j);
+        currentCapacity =
+          tsi.getNumSlotsOccupied() +
+            TaskDataView.getTaskDataView(type).getSlotsPerTask(j);
       }
       int limit = Math.max((int)(Math.ceil((double)currentCapacity/
-          (double)qsi.numJobsByUser.size())), 
-          (int)(Math.ceil((double)(qsi.ulMin*currentCapacity)/100.0)));
+          (double) qsc.getNumJobsByUser().size())),
+          (int)(Math.ceil((double)(qsc.getUlMin() *currentCapacity)/100.0)));
       String user = j.getProfile().getUser();
-      if (tsi.numSlotsOccupiedByUser.get(user) >= limit) {
-        LOG.debug("User " + user + " is over limit, num slots occupied = " + 
-            tsi.numSlotsOccupiedByUser.get(user) + ", limit = " + limit);
+      if (tsi.getNumSlotsOccupiedByUser().get(user) >= limit) {
+        LOG.debug("User " + user + " is over limit, num slots occupied = " +
+            tsi.getNumSlotsOccupiedByUser().get(user) + ", limit = " + limit);
         return true;
       }
       else {
@@ -488,29 +257,30 @@
     }
 
     /*
-     * This is the central scheduling method. 
-     * It tries to get a task from jobs in a single queue. 
-     * Always return a TaskLookupResult object. Don't return null. 
+     * This is the central scheduling method.
+     * It tries to get a task from jobs in a single queue.
+     * Always return a TaskLookupResult object. Don't return null.
      */
     private TaskLookupResult getTaskFromQueue(TaskTracker taskTracker,
-                                              QueueSchedulingInfo qsi)
+                                              QueueSchedulingContext qsi)
     throws IOException {
       TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
       // we only look at jobs in the running queues, as these are the ones
       // who have been potentially initialized
 
-      for (JobInProgress j : 
-        scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
-        // only look at jobs that can be run. We ignore jobs that haven't 
-        // initialized, or have completed but haven't been removed from the 
-        // running queue. 
+      for (JobInProgress j :
+        scheduler.jobQueuesManager.getJobQueue(qsi.getQueueName())
+          .getRunningJobs()) {
+        // only look at jobs that can be run. We ignore jobs that haven't
+        // initialized, or have completed but haven't been removed from the
+        // running queue.
         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
           continue;
         }
         // check if the job's user is over limit
         if (isUserOverLimit(j, qsi)) {
           continue;
-        } 
+        }
         //If this job meets memory requirements. Ask the JobInProgress for
         //a task to be scheduled on the task tracker.
         //if we find a job then we pass it on.
@@ -526,7 +296,6 @@
             //skip to the next job in the queue.
             LOG.debug("Job " + j.getJobID().toString()
                 + " returned no tasks of type " + type);
-            continue;
           }
         } else {
           // if memory requirements don't match then we check if the job has
@@ -534,7 +303,9 @@
           // tasktrackers to cover all pending tasks. If so we reserve the
           // current tasktracker for this job so that high memory jobs are not
           // starved
-          if ((getPendingTasks(j) != 0 && !hasSufficientReservedTaskTrackers(j))) {
+          TaskDataView view = TaskDataView.getTaskDataView(type);
+          if ((view.getPendingTasks(j) != 0 && 
+                !view.hasSufficientReservedTaskTrackers(j))) {
             // Reserve all available slots on this tasktracker
             LOG.info(j.getJobID() + ": Reserving "
                 + taskTracker.getTrackerName()
@@ -549,25 +320,26 @@
         // if we're here, this job has no task to run. Look at the next job.
       }//end of for loop
 
-      // if we're here, we haven't found any task to run among all jobs in 
-      // the queue. This could be because there is nothing to run, or that 
-      // the user limit for some user is too strict, i.e., there's at least 
-      // one user who doesn't have enough tasks to satisfy his limit. If 
-      // it's the latter case, re-look at jobs without considering user 
+      // if we're here, we haven't found any task to run among all jobs in
+      // the queue. This could be because there is nothing to run, or that
+      // the user limit for some user is too strict, i.e., there's at least
+      // one user who doesn't have enough tasks to satisfy his limit. If
+      // it's the latter case, re-look at jobs without considering user
       // limits, and get a task from the first eligible job; however
-      // we do not 'reserve' slots on tasktrackers anymore since the user is 
+      // we do not 'reserve' slots on tasktrackers anymore since the user is
       // already over the limit
-      // Note: some of the code from above is repeated here. This is on 
-      // purpose as it improves overall readability.  
+      // Note: some of the code from above is repeated here. This is on
+      // purpose as it improves overall readability.
       // Note: we walk through jobs again. Some of these jobs, which weren't
-      // considered in the first pass, shouldn't be considered here again, 
+      // considered in the first pass, shouldn't be considered here again,
       // but we still check for their viability to keep the code simple. In
-      // some cases, for high mem jobs that have nothing to run, we call 
-      // obtainNewTask() unnecessarily. Should this be a problem, we can 
-      // create a list of jobs to look at (those whose users were over 
-      // limit) in the first pass and walk through that list only. 
-      for (JobInProgress j : 
-        scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
+      // some cases, for high mem jobs that have nothing to run, we call
+      // obtainNewTask() unnecessarily. Should this be a problem, we can
+      // create a list of jobs to look at (those whose users were over
+      // limit) in the first pass and walk through that list only.
+      for (JobInProgress j :
+        scheduler.jobQueuesManager.getJobQueue(qsi.getQueueName())
+          .getRunningJobs()) {
         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
           continue;
         }
@@ -580,87 +352,86 @@
             // we're successful in getting a task
             return TaskLookupResult.getTaskFoundResult(t);
           } else {
-            //skip to the next job in the queue.
-            continue;
           }
         } else {
-          //if memory requirements don't match then we check if the 
+          //if memory requirements don't match then we check if the
           //job has either pending or speculative task. If the job
           //has pending or speculative task we block till this job
-          //tasks get scheduled, so that high memory jobs are not 
+          //tasks get scheduled, so that high memory jobs are not
           //starved
-          if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTrackerStatus)) {
+          if (TaskDataView.getTaskDataView(type).getPendingTasks(j) != 0 ||
+            hasSpeculativeTask(j, taskTrackerStatus)) {
             return TaskLookupResult.getMemFailedResult();
-          } 
+          }
         }//end of memory check block
       }//end of for loop
 
       // found nothing for this queue, look at the next one.
-      String msg = "Found no task from the queue " + qsi.queueName;
+      String msg = "Found no task from the queue " + qsi.getQueueName();
       LOG.debug(msg);
       return TaskLookupResult.getNoTaskFoundResult();
     }
 
-    // Always return a TaskLookupResult object. Don't return null. 
-    // The caller is responsible for ensuring that the QSI objects and the 
+    // Always return a TaskLookupResult object. Don't return null.
+    // The caller is responsible for ensuring that the QSC objects and the
     // collections are up-to-date.
-    private TaskLookupResult assignTasks(TaskTracker taskTracker) 
+    private TaskLookupResult assignTasks(TaskTracker taskTracker)
     throws IOException {
       TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
 
-      printQSIs();
+      printQSCs();
 
       // Check if this tasktracker has been reserved for a job...
       JobInProgress job = taskTracker.getJobForFallowSlot(type);
       if (job != null) {
         int availableSlots = taskTracker.getAvailableSlots(type);
         if (LOG.isDebugEnabled()) {
-          LOG.debug(job.getJobID() + ": Checking 'reserved' tasktracker " + 
-                    taskTracker.getTrackerName() + " with " + availableSlots + 
+          LOG.debug(job.getJobID() + ": Checking 'reserved' tasktracker " +
+                    taskTracker.getTrackerName() + " with " + availableSlots +
                     " '" + type + "' slots");
         }
 
         if (availableSlots >= job.getNumSlotsPerTask(type)) {
-          // Unreserve 
+          // Unreserve
           taskTracker.unreserveSlots(type, job);
-          
+
           // We found a suitable job. Get task from it.
           Task t = obtainNewTask(taskTrackerStatus, job);
           //if there is a task return it immediately.
           if (t != null) {
             if (LOG.isDebugEnabled()) {
-              LOG.info(job.getJobID() + ": Got " + t.getTaskID() + 
-                       " for reserved tasktracker " + 
+              LOG.info(job.getJobID() + ": Got " + t.getTaskID() +
+                       " for reserved tasktracker " +
                        taskTracker.getTrackerName());
             }
             // we're successful in getting a task
             return TaskLookupResult.getTaskFoundResult(t);
-          } 
+          }
         } else {
           // Re-reserve the current tasktracker
           taskTracker.reserveSlots(type, job, availableSlots);
-          
+
           if (LOG.isDebugEnabled()) {
-            LOG.debug(job.getJobID() + ": Re-reserving " + 
+            LOG.debug(job.getJobID() + ": Re-reserving " +
                       taskTracker.getTrackerName());
           }
 
-          return TaskLookupResult.getMemFailedResult(); 
+          return TaskLookupResult.getMemFailedResult();
         }
       }
-      
-      
-      for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
-        // we may have queues with capacity=0. We shouldn't look at jobs from 
+
+      for (AbstractQueue q : getOrderedJobQueues()) {
+        QueueSchedulingContext qsc = q.getQueueSchedulingContext();
+        // we may have queues with capacity=0. We shouldn't look at jobs from
         // these queues
-        if (0 == getTSI(qsi).getCapacity()) {
+        if (0 == getTSC(qsc).getCapacity()) {
           continue;
         }
-        
-        if(this.areTasksInQueueOverLimit(qsi)) {
+
+        if(this.areTasksInQueueOverLimit(qsc)) {
           continue;
         }
-        TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
+        TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsc);
         TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();
 
         if (lookUpStatus == TaskLookupResult.LookUpStatus.NO_TASK_FOUND) {
@@ -672,7 +443,7 @@
           return tlr;
         }
         // if there was a memory mismatch, return
-        else if (lookUpStatus == 
+        else if (lookUpStatus ==
           TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT) {
             return tlr;
         }
@@ -685,65 +456,84 @@
 
     /**
      * Check if the max task limit is set for this queue
-     * if set , ignore this qsi if current num of occupied
-     * slots  of a TYPE in the queue is >= getMaxTaskCap().
-     * @param qsi
+     * if set , ignore this qsc if current num of occupied
+     * slots  of a TYPE in the queue is >= getMaxTaskCap() or
+     * if they have reached there Max Capacity.
+     * @param qsc
      * @return
      */
 
-    private boolean areTasksInQueueOverLimit(QueueSchedulingInfo qsi) {
-      TaskSchedulingInfo tsi = getTSI(qsi);
+    private boolean areTasksInQueueOverLimit(
+      QueueSchedulingContext qsc) {
+      TaskSchedulingContext tsi = getTSC(qsc);
+      //check for maxTaskLimit
+
       if (tsi.getMaxTaskLimit() >= 0) {
-        if (tsi.numSlotsOccupied >= tsi.getCapacity()) {
+        if (tsi.getNumSlotsOccupied() >= tsi.getCapacity()) {
           if (LOG.isDebugEnabled()) {
             LOG.debug(
-              "Queue " + qsi.queueName + " has reached its  max " + type +
+              "Queue " + qsc.getQueueName() + " has reached its  max " + type +
                 " limit ");
             LOG.debug("Current running tasks " + tsi.getCapacity());
           }
           return true;
         }
       }
+
+      if(tsi.getMaxCapacity() >= 0) {
+        if(tsi.getNumSlotsOccupied() >= tsi.getMaxCapacity()) {
+          if(LOG.isDebugEnabled()) {
+            LOG.debug(
+              "Queue " + qsc.getQueueName() + " " +
+                "has reached its  max " + type + "Capacity"  ); 
+            LOG.debug("Current running tasks " + tsi.getCapacity());
+
+          }
+          return true;
+        }
+      }
       return false;
     }
 
+
     // for debugging.
-    private void printQSIs() {
+    private void printQSCs() {
       if (LOG.isDebugEnabled()) {
         StringBuffer s = new StringBuffer();
-        for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
-          TaskSchedulingInfo tsi = getTSI(qsi);
+        for (AbstractQueue aq: getOrderedJobQueues()) {
+          QueueSchedulingContext qsi = aq.getQueueSchedulingContext();
+          TaskSchedulingContext tsi = getTSC(qsi);
           Collection<JobInProgress> runJobs =
-            scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
+            scheduler.jobQueuesManager.getJobQueue(qsi.getQueueName())
+              .getRunningJobs();
           s.append(
             String.format(
               " Queue '%s'(%s): runningTasks=%d, "
                 + "occupiedSlots=%d, capacity=%d, runJobs=%d  maxTaskLimit=%d ",
-              qsi.queueName,
-              this.type, Integer.valueOf(tsi.numRunningTasks), Integer
-                .valueOf(tsi.numSlotsOccupied), Integer
-                .valueOf(tsi.getCapacity()), Integer.valueOf(runJobs.size()),
-              Integer.valueOf(tsi.getMaxTaskLimit())));
+              qsi.getQueueName(),
+              this.type, tsi.getNumRunningTasks(),
+              tsi.getNumSlotsOccupied(), tsi.getCapacity(), (runJobs.size()),
+              tsi.getMaxTaskLimit()));
         }
         LOG.debug(s);
       }
     }
-    
+
     /**
-     * Check if one of the tasks have a speculative task to execute on the 
+     * Check if one of the tasks have a speculative task to execute on the
      * particular task tracker.
-     * 
+     *
      * @param tips tasks of a job
-     * @param progress percentage progress of the job
      * @param tts task tracker status for which we are asking speculative tip
      * @return true if job has a speculative task to run on particular TT.
      */
-    boolean hasSpeculativeTask(TaskInProgress[] tips, float progress, 
-        TaskTrackerStatus tts) {
+    boolean hasSpeculativeTask(
+      TaskInProgress[] tips,
+      TaskTrackerStatus tts) {
       long currentTime = System.currentTimeMillis();
       for(TaskInProgress tip : tips)  {
-        if(tip.isRunning() 
-            && !(tip.hasRunOnMachine(tts.getHost(), tts.getTrackerName())) 
+        if(tip.isRunning()
+            && !(tip.hasRunOnMachine(tts.getHost(), tts.getTrackerName()))
             && tip.canBeSpeculated(currentTime)) {
           return true;
         }
@@ -753,7 +543,7 @@
   }
 
   /**
-   * The scheduling algorithms for map tasks. 
+   * The scheduling algorithms for map tasks.
    */
   private static class MapSchedulingMgr extends TaskSchedulingMgr {
 
@@ -764,12 +554,12 @@
     }
 
     @Override
-    Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
+    Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
     throws IOException {
-      ClusterStatus clusterStatus = 
+      ClusterStatus clusterStatus =
         scheduler.taskTrackerManager.getClusterStatus();
       int numTaskTrackers = clusterStatus.getTaskTrackers();
-      return job.obtainNewMapTask(taskTracker, numTaskTrackers, 
+      return job.obtainNewMapTask(taskTracker, numTaskTrackers,
           scheduler.taskTrackerManager.getNumberOfUniqueHosts());
     }
 
@@ -779,43 +569,24 @@
     }
 
     @Override
-    int getRunningTasks(JobInProgress job) {
-      return job.runningMaps();
+    TaskSchedulingContext getTSC(QueueSchedulingContext qsi) {
+      return qsi.getMapTSC();
     }
 
-    @Override
-    int getPendingTasks(JobInProgress job) {
-      return job.pendingMaps();
-    }
-
-    @Override
-    int getSlotsPerTask(JobInProgress job) {
-      return 
-        job.getJobConf().computeNumSlotsPerMap(scheduler.getMemSizeForMapSlot());    
-    }
-
-    @Override
-    TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
-      return qsi.mapTSI;
-    }
-
-    int getNumReservedTaskTrackers(JobInProgress job) {
-      return job.getNumReservedTaskTrackersForMaps();
-    }
 
     @Override
     boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
-      //Check if job supports speculative map execution first then 
+      //Check if job supports speculative map execution first then
       //check if job has speculative maps.
       return (job.getJobConf().getMapSpeculativeExecution())&& (
-          hasSpeculativeTask(job.getMapTasks(), 
-              job.getStatus().mapProgress(), tts));
+          hasSpeculativeTask(job.getMapTasks(),
+                             tts));
     }
 
   }
 
   /**
-   * The scheduling algorithms for reduce tasks. 
+   * The scheduling algorithms for reduce tasks.
    */
   private static class ReduceSchedulingMgr extends TaskSchedulingMgr {
 
@@ -826,12 +597,12 @@
     }
 
     @Override
-    Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
+    Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
     throws IOException {
-      ClusterStatus clusterStatus = 
+      ClusterStatus clusterStatus =
         scheduler.taskTrackerManager.getClusterStatus();
       int numTaskTrackers = clusterStatus.getTaskTrackers();
-      return job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
+      return job.obtainNewReduceTask(taskTracker, numTaskTrackers,
           scheduler.taskTrackerManager.getNumberOfUniqueHosts());
     }
 
@@ -842,28 +613,8 @@
     }
 
     @Override
-    int getRunningTasks(JobInProgress job) {
-      return job.runningReduces();
-    }
-
-    @Override
-    int getPendingTasks(JobInProgress job) {
-      return job.pendingReduces();
-    }
-
-    @Override
-    int getSlotsPerTask(JobInProgress job) {
-      return
-        job.getJobConf().computeNumSlotsPerReduce(scheduler.getMemSizeForReduceSlot());    
-    }
-
-    @Override
-    TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
-      return qsi.reduceTSI;
-    }
-
-    int getNumReservedTaskTrackers(JobInProgress job) {
-      return job.getNumReservedTaskTrackersForReduces();
+    TaskSchedulingContext getTSC(QueueSchedulingContext qsi) {
+      return qsi.getReduceTSC();
     }
 
     @Override
@@ -871,33 +622,24 @@
       //check if the job supports reduce speculative execution first then
       //check if the job has speculative tasks.
       return (job.getJobConf().getReduceSpeculativeExecution()) && (
-          hasSpeculativeTask(job.getReduceTasks(), 
-              job.getStatus().reduceProgress(), tts));
+          hasSpeculativeTask(job.getReduceTasks(),
+                             tts));
     }
 
   }
-  
-  /** the scheduling mgrs for Map and Reduce tasks */ 
+
+  /** the scheduling mgrs for Map and Reduce tasks */
   protected TaskSchedulingMgr mapScheduler = new MapSchedulingMgr(this);
   protected TaskSchedulingMgr reduceScheduler = new ReduceSchedulingMgr(this);
 
-  MemoryMatcher memoryMatcher = new MemoryMatcher(this);
+  MemoryMatcher memoryMatcher = new MemoryMatcher();
 
-  /** we keep track of the number of map/reduce slots we saw last */
-  private int prevMapClusterCapacity = 0;
-  private int prevReduceClusterCapacity = 0;
-  
-    
   static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
   protected JobQueuesManager jobQueuesManager;
   protected CapacitySchedulerConf schedConf;
   /** whether scheduler has started or not */
   private boolean started = false;
 
-  final static String JOB_SCHEDULING_INFO_FORMAT_STRING =
-    "%s running map tasks using %d map slots. %d additional slots reserved." +
-    " %s running reduce tasks using %d reduce slots." +
-    " %d additional slots reserved.";
   /**
    * A clock class - can be mocked out for testing.
    */
@@ -910,116 +652,31 @@
   private Clock clock;
   private JobInitializationPoller initializationPoller;
 
-  private long memSizeForMapSlotOnJT;
-  private long memSizeForReduceSlotOnJT;
-  private long limitMaxMemForMapTasks;
-  private long limitMaxMemForReduceTasks;
-
   public CapacityTaskScheduler() {
     this(new Clock());
   }
   
   // for testing
   public CapacityTaskScheduler(Clock clock) {
-    this.jobQueuesManager = new JobQueuesManager(this);
+    this.jobQueuesManager = new JobQueuesManager();
+    //root schedulingContex ,
+    //we are assuming this to be the context of root queue
+    root = createRoot();
     this.clock = clock;
   }
+
+  AbstractQueue createRoot() {
+    QueueSchedulingContext rootContext =
+      new QueueSchedulingContext("",100,-1,-1, -1,-1);
+    AbstractQueue root = new ContainerQueue(null,rootContext);
+    return root;
+  }
   
   /** mostly for testing purposes */
   public void setResourceManagerConf(CapacitySchedulerConf conf) {
     this.schedConf = conf;
   }
 
-  private void initializeMemoryRelatedConf() {
-    //handling @deprecated
-    if (conf.get(
-      CapacitySchedulerConf.DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY) !=
-      null) {
-      LOG.warn(
-        JobConf.deprecatedString(
-          CapacitySchedulerConf.DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY));
-    }
-
-    //handling @deprecated
-    if (conf.get(CapacitySchedulerConf.UPPER_LIMIT_ON_TASK_PMEM_PROPERTY) !=
-      null) {
-      LOG.warn(
-        JobConf.deprecatedString(
-          CapacitySchedulerConf.UPPER_LIMIT_ON_TASK_PMEM_PROPERTY));
-    }
-
-    if (conf.get(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY) != null) {
-      LOG.warn(
-        JobConf.deprecatedString(
-          JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY));
-    }
-
-    memSizeForMapSlotOnJT =
-        JobConf.normalizeMemoryConfigValue(conf.getLong(
-            JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-    memSizeForReduceSlotOnJT =
-        JobConf.normalizeMemoryConfigValue(conf.getLong(
-            JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-
-    //handling @deprecated values
-    if (conf.get(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY) != null) {
-      LOG.warn(
-        JobConf.deprecatedString(
-          JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY)+
-          " instead use " +JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY+
-          " and " + JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY
-      );
-      
-      limitMaxMemForMapTasks = limitMaxMemForReduceTasks =
-        JobConf.normalizeMemoryConfigValue(
-          conf.getLong(
-            JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-      if (limitMaxMemForMapTasks != JobConf.DISABLED_MEMORY_LIMIT &&
-        limitMaxMemForMapTasks >= 0) {
-        limitMaxMemForMapTasks = limitMaxMemForReduceTasks =
-          limitMaxMemForMapTasks /
-            (1024 * 1024); //Converting old values in bytes to MB
-      }
-    } else {
-      limitMaxMemForMapTasks =
-        JobConf.normalizeMemoryConfigValue(
-          conf.getLong(
-            JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-      limitMaxMemForReduceTasks =
-        JobConf.normalizeMemoryConfigValue(
-          conf.getLong(
-            JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-    }
-    LOG.info(String.format("Scheduler configured with "
-        + "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT, "
-        + "limitMaxMemForMapTasks, limitMaxMemForReduceTasks)"
-        + " (%d,%d,%d,%d)", Long.valueOf(memSizeForMapSlotOnJT), Long
-        .valueOf(memSizeForReduceSlotOnJT), Long
-        .valueOf(limitMaxMemForMapTasks), Long
-        .valueOf(limitMaxMemForReduceTasks)));
-  }
-
-  long getMemSizeForMapSlot() {
-    return memSizeForMapSlotOnJT;
-  }
-
-  long getMemSizeForReduceSlot() {
-    return memSizeForReduceSlotOnJT;
-  }
-
-  long getLimitMaxMemForMapSlot() {
-    return limitMaxMemForMapTasks;
-  }
-
-  long getLimitMaxMemForReduceSlot() {
-    return limitMaxMemForReduceTasks;
-  }
-
   String[] getOrderedQueues(TaskType type) {
     if (type == TaskType.MAP) {
       return mapScheduler.getOrderedQueues();
@@ -1038,58 +695,23 @@
       schedConf = new CapacitySchedulerConf();
     }
 
-    initializeMemoryRelatedConf();
-    
+    MemoryMatcher.initializeMemoryRelatedConf(conf);
     // read queue info from config file
     QueueManager queueManager = taskTrackerManager.getQueueManager();
+
+    //get parent level queues. these are defined in mapred-*.xml
     Set<String> queues = queueManager.getQueues();
-    // Sanity check: there should be at least one queue. 
+
+    // Sanity check: there should be at least one queue.
     if (0 == queues.size()) {
       throw new IllegalStateException("System has no queue configured");
     }
 
-    Set<String> queuesWithoutConfiguredCapacity = new HashSet<String>();
-    float totalCapacity = 0.0f;
-    for (String queueName: queues) {
-      float capacity = schedConf.getCapacity(queueName);
-      if(capacity == -1.0) {
-        queuesWithoutConfiguredCapacity.add(queueName);
-      }else {
-        totalCapacity += capacity;
-      }
-      int ulMin = schedConf.getMinimumUserLimitPercent(queueName);
-      // create our QSI and add to our hashmap
-      QueueSchedulingInfo qsi = new QueueSchedulingInfo(
-        queueName, capacity, ulMin, jobQueuesManager, schedConf.getMaxMapCap(
-          queueName), schedConf.getMaxReduceCap(queueName));
-      queueInfoMap.put(queueName, qsi);
-
-      // create the queues of job objects
-      boolean supportsPrio = schedConf.isPrioritySupported(queueName);
-      jobQueuesManager.createQueue(queueName, supportsPrio);
-      
-      SchedulingDisplayInfo schedulingInfo = 
-        new SchedulingDisplayInfo(queueName, this);
-      queueManager.setSchedulerInfo(queueName, schedulingInfo);
-      
-    }
-    float remainingQuantityToAllocate = 100 - totalCapacity;
-    float quantityToAllocate = 
-      remainingQuantityToAllocate/queuesWithoutConfiguredCapacity.size();
-    for(String queue: queuesWithoutConfiguredCapacity) {
-      QueueSchedulingInfo qsi = queueInfoMap.get(queue); 
-      qsi.capacityPercent = quantityToAllocate;
-      schedConf.setCapacity(queue, quantityToAllocate);
-    }    
-    
-    if (totalCapacity > 100.0) {
-      throw new IllegalArgumentException("Sum of queue capacities over 100% at "
-                                         + totalCapacity);
-    }    
-    
-    // let our mgr objects know about the queues
-    mapScheduler.initialize(queueInfoMap);
-    reduceScheduler.initialize(queueInfoMap);
+    QueueHierarchyBuilder builder = new QueueHierarchyBuilder(this.schedConf);
+    //load complete hierarchy of queues.
+    builder.createHierarchy(root,queues);
+    updateQueueMaps();
+    root.distributeUnConfiguredCapacity();
     
     // listen to job changes
     taskTrackerManager.addJobInProgressListener(jobQueuesManager);
@@ -1097,16 +719,32 @@
     //Start thread for initialization
     if (initializationPoller == null) {
       this.initializationPoller = new JobInitializationPoller(
-          jobQueuesManager,schedConf,queues, taskTrackerManager);
+          jobQueuesManager, taskTrackerManager);
     }
-    initializationPoller.init(queueManager.getQueues(), schedConf);
+    initializationPoller.init(jobQueuesManager.getJobQueueNames(), schedConf);
     initializationPoller.setDaemon(true);
     initializationPoller.start();
 
     started = true;
     LOG.info("Capacity scheduler initialized " + queues.size() + " queues");  
   }
-  
+
+  /**
+   * Updates the classes and data structures that store queues with
+   * hierarchical queues. This API should be called only once after
+   * construction of the hierarchy.
+   */
+  private void updateQueueMaps() {
+    List<AbstractQueue> allQueues = getRoot().getDescendentJobQueues();
+    for (AbstractQueue queue : allQueues) {
+      if (queue instanceof JobQueue) {
+        jobQueuesManager.addQueue((JobQueue)queue);
+      }
+      addToQueueInfoMap(queue.getQueueSchedulingContext());  
+      createDisplayInfo(taskTrackerManager.getQueueManager(), queue.getName());
+    }
+  }
+
   /** mostly for testing purposes */
   void setInitializationPoller(JobInitializationPoller p) {
     this.initializationPoller = p;
@@ -1133,111 +771,28 @@
    * provided for the test classes
    * lets you update the QSI objects and sorted collections
    */ 
-  void updateQSIInfoForTests() {
+  void updateContextInfoForTests() {
     ClusterStatus c = taskTrackerManager.getClusterStatus();
     int mapClusterCapacity = c.getMaxMapTasks();
     int reduceClusterCapacity = c.getMaxReduceTasks();
     // update the QSI objects
-    updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
-    mapScheduler.updateCollectionOfQSIs();
-    reduceScheduler.updateCollectionOfQSIs();
+    updateContextObjects(mapClusterCapacity, reduceClusterCapacity);
+    mapScheduler.scheduler.root.sort(mapScheduler.queueComparator);
+    reduceScheduler.scheduler.root.sort(reduceScheduler.queueComparator);
   }
 
   /**
-   * Update individual QSI objects.
+   * Update individual QSC objects.
    * We don't need exact information for all variables, just enough for us
    * to make scheduling decisions. For example, we don't need an exact count
    * of numRunningTasks. Once we count upto the grid capacity, any
    * number beyond that will make no difference.
    *
    **/
-  private synchronized void updateQSIObjects(int mapClusterCapacity,
+  private synchronized void updateContextObjects(int mapClusterCapacity,
       int reduceClusterCapacity) {
-    // if # of slots have changed since last time, update.
-    // First, compute whether the total number of TT slots have changed
-    for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
-      // compute new capacities, if TT slots have changed
-      if (mapClusterCapacity != prevMapClusterCapacity) {
-        qsi.mapTSI.setCapacity((int)
-          (qsi.capacityPercent*mapClusterCapacity/100));
-      }
-      if (reduceClusterCapacity != prevReduceClusterCapacity) {
-        qsi.reduceTSI.setCapacity((int)
-          (qsi.capacityPercent*reduceClusterCapacity/100));
-      }
-      // reset running/pending tasks, tasks per user
-      qsi.mapTSI.resetTaskVars();
-      qsi.reduceTSI.resetTaskVars();
-      // update stats on running jobs
-      for (JobInProgress j:
-        jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
-        if (j.getStatus().getRunState() != JobStatus.RUNNING) {
-          continue;
-        }
+    root.update(mapClusterCapacity,reduceClusterCapacity);
 
-        int numMapsRunningForThisJob = mapScheduler.getRunningTasks(j);
-        int numReducesRunningForThisJob = reduceScheduler.getRunningTasks(j);
-        int numRunningMapSlots = 
-          numMapsRunningForThisJob * mapScheduler.getSlotsPerTask(j);
-        int numRunningReduceSlots =
-          numReducesRunningForThisJob * reduceScheduler.getSlotsPerTask(j);
-        int numMapSlotsForThisJob = mapScheduler.getSlotsOccupied(j);
-        int numReduceSlotsForThisJob = reduceScheduler.getSlotsOccupied(j);
-        int numReservedMapSlotsForThisJob = 
-          (mapScheduler.getNumReservedTaskTrackers(j) * 
-           mapScheduler.getSlotsPerTask(j)); 
-        int numReservedReduceSlotsForThisJob = 
-          (reduceScheduler.getNumReservedTaskTrackers(j) * 
-           reduceScheduler.getSlotsPerTask(j)); 
-        j.setSchedulingInfo(
-            String.format(JOB_SCHEDULING_INFO_FORMAT_STRING,
-                          Integer.valueOf(numMapsRunningForThisJob), 
-                          Integer.valueOf(numRunningMapSlots),
-                          Integer.valueOf(numReservedMapSlotsForThisJob),
-                          Integer.valueOf(numReducesRunningForThisJob), 
-                          Integer.valueOf(numRunningReduceSlots),
-                          Integer.valueOf(numReservedReduceSlotsForThisJob)));
-        qsi.mapTSI.numRunningTasks += numMapsRunningForThisJob;
-        qsi.reduceTSI.numRunningTasks += numReducesRunningForThisJob;
-        qsi.mapTSI.numSlotsOccupied += numMapSlotsForThisJob;
-        qsi.reduceTSI.numSlotsOccupied += numReduceSlotsForThisJob;
-        Integer i =
-            qsi.mapTSI.numSlotsOccupiedByUser.get(j.getProfile().getUser());
-        qsi.mapTSI.numSlotsOccupiedByUser.put(j.getProfile().getUser(),
-            Integer.valueOf(i.intValue() + numMapSlotsForThisJob));
-        i = qsi.reduceTSI.numSlotsOccupiedByUser.get(j.getProfile().getUser());
-        qsi.reduceTSI.numSlotsOccupiedByUser.put(j.getProfile().getUser(),
-            Integer.valueOf(i.intValue() + numReduceSlotsForThisJob));
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("updateQSI: job %s: run(m)=%d, "
-              + "occupied(m)=%d, run(r)=%d, occupied(r)=%d, finished(m)=%d,"
-              + " finished(r)=%d, failed(m)=%d, failed(r)=%d, "
-              + "spec(m)=%d, spec(r)=%d, total(m)=%d, total(r)=%d", j
-              .getJobID().toString(), Integer
-              .valueOf(numMapsRunningForThisJob), Integer
-              .valueOf(numMapSlotsForThisJob), Integer
-              .valueOf(numReducesRunningForThisJob), Integer
-              .valueOf(numReduceSlotsForThisJob), Integer.valueOf(j
-              .finishedMaps()), Integer.valueOf(j.finishedReduces()), Integer
-              .valueOf(j.failedMapTasks),
-              Integer.valueOf(j.failedReduceTasks), Integer
-                  .valueOf(j.speculativeMapTasks), Integer
-                  .valueOf(j.speculativeReduceTasks), Integer
-                  .valueOf(j.numMapTasks), Integer.valueOf(j.numReduceTasks)));
-        }
-
-        /*
-         * it's fine walking down the entire list of running jobs - there
-         * probably will not be many, plus, we may need to go through the
-         * list to compute numSlotsOccupiedByUser. If this is expensive, we
-         * can keep a list of running jobs per user. Then we only need to
-         * consider the first few jobs per user.
-         */
-      }
-    }
-
-    prevMapClusterCapacity = mapClusterCapacity;
-    prevReduceClusterCapacity = reduceClusterCapacity;
   }
 
   /*
@@ -1271,7 +826,8 @@
     int currentMapSlots = taskTrackerStatus.countOccupiedMapSlots();
     int maxReduceSlots = taskTrackerStatus.getMaxReduceSlots();
     int currentReduceSlots = taskTrackerStatus.countOccupiedReduceSlots();
-    LOG.debug("TT asking for task, max maps=" + taskTrackerStatus.getMaxMapSlots() + 
+    LOG.debug("TT asking for task, max maps="
+      + taskTrackerStatus.getMaxMapSlots() + 
         ", run maps=" + taskTrackerStatus.countMapTasks() + ", max reds=" + 
         taskTrackerStatus.getMaxReduceSlots() + ", run reds=" + 
         taskTrackerStatus.countReduceTasks() + ", map cap=" + 
@@ -1279,19 +835,18 @@
         reduceClusterCapacity);
 
     /* 
-     * update all our QSI objects.
-     * This involves updating each qsi structure. This operation depends
+     * update all our QSC objects.
+     * This involves updating each qsC structure. This operation depends
      * on the number of running jobs in a queue, and some waiting jobs. If it
      * becomes expensive, do it once every few heartbeats only.
      */ 
-    updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
+    updateContextObjects(mapClusterCapacity, reduceClusterCapacity);
     // make sure we get our map or reduce scheduling object to update its 
-    // collection of QSI objects too. 
+    // collection of QSC objects too.
 
     if ((maxReduceSlots - currentReduceSlots) > 
     (maxMapSlots - currentMapSlots)) {
       // get a reduce task first
-      reduceScheduler.updateCollectionOfQSIs();
       tlr = reduceScheduler.assignTasks(taskTracker);
       if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
         tlr.getLookUpStatus()) {
@@ -1304,7 +859,6 @@
                 TaskLookupResult.LookUpStatus.NO_TASK_FOUND
                                   == tlr.getLookUpStatus())
           && (maxMapSlots > currentMapSlots)) {
-        mapScheduler.updateCollectionOfQSIs();
         tlr = mapScheduler.assignTasks(taskTracker);
         if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
           tlr.getLookUpStatus()) {
@@ -1314,7 +868,6 @@
     }
     else {
       // get a map task first
-      mapScheduler.updateCollectionOfQSIs();
       tlr = mapScheduler.assignTasks(taskTracker);
       if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
         tlr.getLookUpStatus()) {
@@ -1327,7 +880,6 @@
                 || TaskLookupResult.LookUpStatus.NO_TASK_FOUND
                                     == tlr.getLookUpStatus())
           && (maxReduceSlots > currentReduceSlots)) {
-        reduceScheduler.updateCollectionOfQSIs();
         tlr = reduceScheduler.assignTasks(taskTracker);
         if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
           tlr.getLookUpStatus()) {
@@ -1339,85 +891,17 @@
     return null;
   }
 
-  // called when a job is added
-  synchronized void jobAdded(JobInProgress job) throws IOException {
-    QueueSchedulingInfo qsi = 
-      queueInfoMap.get(job.getProfile().getQueueName());
-    // qsi shouldn't be null
-    // update user-specific info
-    Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
-    if (null == i) {
-      i = 1;
-      // set the count for running tasks to 0
-      qsi.mapTSI.numSlotsOccupiedByUser.put(job.getProfile().getUser(),
-          Integer.valueOf(0));
-      qsi.reduceTSI.numSlotsOccupiedByUser.put(job.getProfile().getUser(),
-          Integer.valueOf(0));
-    }
-    else {
-      i++;
-    }
-    qsi.numJobsByUser.put(job.getProfile().getUser(), i);
-    
-    // setup scheduler specific job information
-    preInitializeJob(job);
-    
-    LOG.debug("Job " + job.getJobID().toString() + " is added under user " 
-              + job.getProfile().getUser() + ", user now has " + i + " jobs");
-  }
-
-  /**
-   * Setup {@link CapacityTaskScheduler} specific information prior to
-   * job initialization.
-   */
-  void preInitializeJob(JobInProgress job) {
-    JobConf jobConf = job.getJobConf();
-    
-    // Compute number of slots required to run a single map/reduce task
-    int slotsPerMap = 1;
-    int slotsPerReduce = 1;
-    if (memoryMatcher.isSchedulingBasedOnMemEnabled()) {
-      slotsPerMap = jobConf.computeNumSlotsPerMap(getMemSizeForMapSlot());
-     slotsPerReduce = 
-       jobConf.computeNumSlotsPerReduce(getMemSizeForReduceSlot());
-    }
-    job.setNumSlotsPerMap(slotsPerMap);
-    job.setNumSlotsPerReduce(slotsPerReduce);
-  }
-  
-  // called when a job completes
-  synchronized void jobCompleted(JobInProgress job) {
-    QueueSchedulingInfo qsi = 
-      queueInfoMap.get(job.getProfile().getQueueName());
-    // qsi shouldn't be null
-    // update numJobsByUser
-    LOG.debug("JOb to be removed for user " + job.getProfile().getUser());
-    Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
-    i--;
-    if (0 == i.intValue()) {
-      qsi.numJobsByUser.remove(job.getProfile().getUser());
-      // remove job footprint from our TSIs
-      qsi.mapTSI.numSlotsOccupiedByUser.remove(job.getProfile().getUser());
-      qsi.reduceTSI.numSlotsOccupiedByUser.remove(job.getProfile().getUser());
-      LOG.debug("No more jobs for user, number of users = " + qsi.numJobsByUser.size());
-    }
-    else {
-      qsi.numJobsByUser.put(job.getProfile().getUser(), i);
-      LOG.debug("User still has " + i + " jobs, number of users = "
-                + qsi.numJobsByUser.size());
-    }
-  }
   
   @Override
   public synchronized Collection<JobInProgress> getJobs(String queueName) {
     Collection<JobInProgress> jobCollection = new ArrayList<JobInProgress>();
-    Collection<JobInProgress> runningJobs = 
-        jobQueuesManager.getRunningJobQueue(queueName);
+    Collection<JobInProgress> runningJobs =
+      jobQueuesManager.getJobQueue(queueName).getRunningJobs();
     if (runningJobs != null) {
       jobCollection.addAll(runningJobs);
     }
     Collection<JobInProgress> waitingJobs = 
-      jobQueuesManager.getWaitingJobs(queueName);
+      jobQueuesManager.getJobQueue(queueName).getWaitingJobs();
     Collection<JobInProgress> tempCollection = new ArrayList<JobInProgress>();
     if(waitingJobs != null) {
       tempCollection.addAll(waitingJobs);
@@ -1434,12 +918,43 @@
   }
 
   synchronized String getDisplayInfo(String queueName) {
-    QueueSchedulingInfo qsi = queueInfoMap.get(queueName);
-    if (null == qsi) { 
+    QueueSchedulingContext qsi = queueInfoMap.get(queueName);
+    if (null == qsi) {
       return null;
     }
     return qsi.toString();
   }
 
-}
+  synchronized void addToQueueInfoMap(QueueSchedulingContext qsc) {
+    queueInfoMap.put(qsc.getQueueName(), qsc);
+  }
+
+  void createDisplayInfo(QueueManager queueManager, String queueName) {
+    if (queueManager != null) {
+      SchedulingDisplayInfo schedulingInfo =
+        new SchedulingDisplayInfo(queueName, this);
+      queueManager.setSchedulerInfo(queueName, schedulingInfo);
+    }
+  }
+
+
+  /**
+   * Use for testing purposes.
+   * returns the root
+   * @return
+   */
+  AbstractQueue getRoot() {
+    return this.root;
+  }
+
+
+  /**
+   * This is used for testing purpose only
+   * Dont use this method.
+   * @param rt
+   */
+  void setRoot(AbstractQueue rt) {
+    this.root = rt;
+  }
 
+}
\ No newline at end of file



Mime
View raw message