hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r732608 - in /hadoop/core/branches/branch-0.20: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
Date Thu, 08 Jan 2009 05:04:09 GMT
Author: yhemanth
Date: Wed Jan  7 21:04:08 2009
New Revision: 732608

URL: http://svn.apache.org/viewvc?rev=732608&view=rev
Log:
Merge -r 732606:732607 from trunk to branch 0.20 to fix HADOOP-4980.

Modified:
    hadoop/core/branches/branch-0.20/CHANGES.txt
    hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
    hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=732608&r1=732607&r2=732608&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Wed Jan  7 21:04:08 2009
@@ -292,6 +292,9 @@
     HADOOP-4830. Add end-to-end test cases for testing queue capacities.
     (Vinod Kumar Vavilapalli via yhemanth)
 
+    HADOOP-4980. Improve code layout of capacity scheduler to make it 
+    easier to fix some blocker bugs. (Vivek Ratan via yhemanth)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

Modified: hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=732608&r1=732607&r2=732608&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Wed Jan  7 21:04:08 2009
@@ -83,8 +83,6 @@
     // fraction of 'whenToExpire', but we store it here so we don't 
     // recompute it every time. 
     public long whenToKill;
-    // whether tasks have been killed for this resource
-    boolean tasksKilled;
     
     public ReclaimedResource(int amount, long expiryTime, 
         long whenToKill) {
@@ -92,48 +90,39 @@
       this.currentAmount = amount;
       this.whenToExpire = expiryTime;
       this.whenToKill = whenToKill;
-      this.tasksKilled = false;
     }
   }
 
-  /** 
-   * This class keeps track of scheduling info for each queue for either 
-   * Map or Reduce tasks. . 
-   * This scheduling information is used by the JT to decide how to allocate
-   * tasks, redistribute capacity, etc. 
-   */
-  private static class QueueSchedulingInfo {
-    String queueName;
+  /***********************************************************************
+   * Keeping track of scheduling information for queues
+   * 
+   * We need to maintain scheduling information relevant to a queue (its 
+   * name, guaranteed capacity, etc), along with information specific to 
+   * each kind of task, Map or Reduce (num of running tasks, pending 
+   * tasks etc). 
+   * 
+   * This scheduling information is used to decide how to allocate
+   * tasks, redistribute capacity, etc.
+   *  
+   * A QueueSchedulingInfo(QSI) object represents scheduling information for
+   * a queue. A TaskSchedulingInfo (TSI) object represents scheduling 
+   * information for a particular kind of task (Map or Reduce).
+   *   
+   **********************************************************************/
 
-    /** guaranteed capacity(%) is set at config time */ 
-    float guaranteedCapacityPercent = 0;
+  private static class TaskSchedulingInfo {
     /** 
      * the actual gc, which depends on how many slots are available
      * in the cluster at any given time. 
      */
     int guaranteedCapacity = 0;
-    
-    /** 
-     * we also keep track of how many tasks are running for all jobs in 
-     * the queue, and how many overall tasks there are. This info is 
-     * available for each job, but keeping a sum makes our algos faster.
-     */  
     // number of running tasks
     int numRunningTasks = 0;
     // number of pending tasks
     int numPendingTasks = 0;
-    
-    /** 
-     * to handle user limits, we need to know how many users have jobs in 
-     * the queue.
-     */  
-    Map<String, Integer> numJobsByUser = new HashMap<String, Integer>();
     /** for each user, we need to keep track of number of running tasks */
     Map<String, Integer> numRunningTasksByUser = 
       new HashMap<String, Integer>();
-      
-    /** min value of user limit (same for all users) */
-    int ulMin;
     
     /**
      * We need to keep track of resources to reclaim. 
@@ -154,12 +143,6 @@
      */
     
     /**
-     * reclaim time limit (in msec). This time represents the SLA we offer 
-     * a queue - a queue gets back any lost capacity withing this period 
-     * of time.  
-     */ 
-    long reclaimTime;  
-    /**
      * the list of resources to reclaim. This list is always sorted so that
      * resources that need to be reclaimed sooner occur earlier in the list.
      */
@@ -177,112 +160,186 @@
      * created.  
      */
     int numReclaimedResources = 0;
+
+    /**
+     * return information about the tasks
+     */
+    public String toString(){
+      float runningTasksAsPercent = guaranteedCapacity!= 0 ? 
+          ((float)numRunningTasks * 100/guaranteedCapacity):0;
+      StringBuffer sb = new StringBuffer();
+      sb.append("Guaranteed Capacity: " + guaranteedCapacity + "\n");
+      sb.append(String.format("Running tasks: %.1f%% of Guaranteed Capacity\n",
+          runningTasksAsPercent));
+      // include info on active users
+      if (numRunningTasks != 0) {
+        sb.append("Active users:\n");
+        for (Map.Entry<String, Integer> entry: numRunningTasksByUser.entrySet()) {
+          if ((entry.getValue() == null) || (entry.getValue().intValue() <= 0)) {
+            // user has no tasks running
+            continue;
+          }
+          sb.append("User '" + entry.getKey()+ "': ");
+          float p = (float)entry.getValue().intValue()*100/numRunningTasks;
+          sb.append(String.format("%.1f%% of running tasks\n", p));
+        }
+      }
+      return sb.toString();
+    }
+  }
+  
+  private static class QueueSchedulingInfo {
+    String queueName;
+
+    /** guaranteed capacity(%) is set in the config */ 
+    float guaranteedCapacityPercent = 0;
+    
+    /** 
+     * to handle user limits, we need to know how many users have jobs in 
+     * the queue.
+     */  
+    Map<String, Integer> numJobsByUser = new HashMap<String, Integer>();
+      
+    /** min value of user limit (same for all users) */
+    int ulMin;
+    
+    /**
+     * reclaim time limit (in msec). This time represents the SLA we offer 
+     * a queue - a queue gets back any lost capacity withing this period 
+     * of time.  
+     */ 
+    long reclaimTime;
+    
+    /**
+     * We keep track of the JobQueuesManager only for reporting purposes 
+     * (in toString()). 
+     */
+    private JobQueuesManager jobQueuesManager;
     
-    public QueueSchedulingInfo(String queueName, float guaranteedCapacity, 
-        int ulMin, long reclaimTime) {
+    /**
+     * We keep a TaskSchedulingInfo object for each kind of task we support
+     */
+    TaskSchedulingInfo mapTSI;
+    TaskSchedulingInfo reduceTSI;
+    
+    public QueueSchedulingInfo(String queueName, float gcPercent, 
+        int ulMin, long reclaimTime, JobQueuesManager jobQueuesManager) {
       this.queueName = new String(queueName);
-      this.guaranteedCapacityPercent = guaranteedCapacity;
+      this.guaranteedCapacityPercent = gcPercent;
       this.ulMin = ulMin;
       this.reclaimTime = reclaimTime;
+      this.jobQueuesManager = jobQueuesManager;
+      this.mapTSI = new TaskSchedulingInfo();
+      this.reduceTSI = new TaskSchedulingInfo();
+    }
+    
+    /**
+     * return information about the queue
+     */
+    public String toString(){
+      // We print out the queue information first, followed by info
+      // on map and reduce tasks and job info
+      StringBuffer sb = new StringBuffer();
+      sb.append("Queue configuration\n");
+      //sb.append("Name: " + queueName + "\n");
+      sb.append("Guaranteed Capacity Percentage: ");
+      sb.append(guaranteedCapacityPercent);
+      sb.append("%\n");
+      sb.append(String.format("User Limit: %d%s\n",ulMin, "%"));
+      sb.append(String.format("Reclaim Time limit: %s\n", 
+          StringUtils.formatTime(reclaimTime)));
+      sb.append(String.format("Priority Supported: %s\n",
+          (jobQueuesManager.doesQueueSupportPriorities(queueName))?
+              "YES":"NO"));
+      sb.append("-------------\n");
+      
+      sb.append("Map tasks\n");
+      sb.append(mapTSI.toString());
+      sb.append("-------------\n");
+      sb.append("Reduce tasks\n");
+      sb.append(reduceTSI.toString());
+      sb.append("-------------\n");
+      
+      sb.append("Job info\n");
+      sb.append(String.format("Number of Waiting Jobs: %d\n", 
+          jobQueuesManager.getWaitingJobCount(queueName)));
+      sb.append(String.format("Number of users who have submitted jobs: %d\n", 
+          numJobsByUser.size()));
+      return sb.toString();
     }
   }
 
+  /** quick way to get qsi object given a queue name */
+  private Map<String, QueueSchedulingInfo> queueInfoMap = 
+    new HashMap<String, QueueSchedulingInfo>();
+  
   /**
-   * Top level scheduling information to be set to the queueManager
+   * This class captures scheduling information we want to display or log.
    */
-  
-  
-  private static class SchedulingInfo {
-    private QueueSchedulingInfo mqsi;
-    private QueueSchedulingInfo rqsi;
-    private boolean supportsPriority;
-    private JobQueuesManager mgr;
-    private long pollingInterval;
-    
-    
-    SchedulingInfo(QueueSchedulingInfo mqsi, 
-        QueueSchedulingInfo rqsi, boolean supportsPriority,
-        JobQueuesManager mgr, long pollingInterval) {
-      this.mqsi = mqsi;
-      this.rqsi = rqsi;
-      this.supportsPriority=supportsPriority;
-      this.mgr = mgr;
-      this.pollingInterval = pollingInterval;
+  private static class SchedulingDisplayInfo {
+    private String queueName;
+    CapacityTaskScheduler scheduler;
+    
+    SchedulingDisplayInfo(String queueName, CapacityTaskScheduler scheduler) { 
+      this.queueName = queueName;
+      this.scheduler = scheduler;
     }
     
     @Override
     public String toString(){
-      float runningMaps = 0;
-      float runningReduces = 0;
-      
-      Collection<JobInProgress> runningJobs = 
-        mgr.getRunningJobQueue(mqsi.queueName);
-      
-      for(JobInProgress job : runningJobs) {
-        runningMaps += job.runningMaps();
-        runningReduces += job.runningReduces();
-      }
-      float usedMaps = mqsi.guaranteedCapacity!= 0 ? 
-          (runningMaps * 100/mqsi.guaranteedCapacity):0;
-      float usedReduces = rqsi.guaranteedCapacity != 0 ? 
-          (runningReduces * 100/rqsi.guaranteedCapacity) :0;
-      StringBuffer sb = new StringBuffer();
-      sb.append("Guaranteed Capacity : ");
-      sb.append(mqsi.guaranteedCapacityPercent);
-      sb.append(" %\n");
-      sb.append(String.format("Guaranteed Capacity Maps : %d \n",
-          mqsi.guaranteedCapacity));
-      sb.append(String.format("Guaranteed Capacity Reduces : %d \n",
-          rqsi.guaranteedCapacity));
-      sb.append(String.format("User Limit : %d %s\n",mqsi.ulMin, "%"));
-      sb.append(String.format("Reclaim Time limit : %s \n", 
-          StringUtils.formatTime(mqsi.reclaimTime)));
-      sb.append(String.format("Priority Supported : %s \n",
-          supportsPriority?"YES":"NO"));
-      sb.append("-------------\n");
-      sb.append(String.format("Running Maps : %s %s\n",
-          Float.valueOf(usedMaps).toString(),
-          "% of Guaranteed Capacity"));
-      sb.append(String.format("Running Reduces : %s %s\n",
-          Float.valueOf(usedReduces).toString(),
-          "% of Guaranteed Capacity" ));
-      sb.append(String.format("Number of Waiting Jobs : %d \n", mgr
-          .getWaitingJobCount(mqsi.queueName)));
-      return sb.toString();
+      // note that we do not call updateQSIObjects() here for performance
+      // reasons. This means that the data we print out may be slightly
+      // stale. This data is updated whenever assignTasks() is called, or
+      // whenever the reclaim capacity thread runs, which should be fairly
+      // often. If neither of these happen, the data gets stale. If we see
+      // this often, we may need to detect this situation and call 
+      // updateQSIObjects(), or just call it each time. 
+      return scheduler.getDisplayInfo(queueName);
     }
   }
 
-  private static enum TaskLookUpStatus {
-    TASK_FOUND,
-    NO_TASK_IN_JOB,
-    NO_TASK_IN_QUEUE,
-    NO_TASK_MATCHING_MEMORY_REQUIREMENTS,
-  }
-
+  // this class encapsulates the result of a task lookup
   private static class TaskLookupResult {
 
-    private Task task;
-    private String lookupStatusInfo;
+    static enum LookUpStatus {
+      TASK_FOUND,
+      NO_TASK_FOUND,
+      TASK_FAILING_MEMORY_REQUIREMENT,
+    }
+    // constant TaskLookupResult objects. Should not be accessed directly.
+    private static final TaskLookupResult NoTaskLookupResult = 
+      new TaskLookupResult(null, TaskLookupResult.LookUpStatus.NO_TASK_FOUND);
+    private static final TaskLookupResult MemFailedLookupResult = 
+      new TaskLookupResult(null, 
+          TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT);
 
-    private TaskLookUpStatus lookUpStatus;
+    private LookUpStatus lookUpStatus;
+    private Task task;
 
-    TaskLookupResult(Task t, TaskLookUpStatus lUStatus, String statusInfo) {
+    // should not call this constructor directly. use static factory methods.
+    private TaskLookupResult(Task t, LookUpStatus lUStatus) {
       this.task = t;
       this.lookUpStatus = lUStatus;
-      this.lookupStatusInfo = statusInfo;
     }
+    
+    static TaskLookupResult getTaskFoundResult(Task t) {
+      return new TaskLookupResult(t, LookUpStatus.TASK_FOUND);
+    }
+    static TaskLookupResult getNoTaskFoundResult() {
+      return NoTaskLookupResult;
+    }
+    static TaskLookupResult getMemFailedResult() {
+      return MemFailedLookupResult;
+    }
+    
 
     Task getTask() {
       return task;
     }
 
-    TaskLookUpStatus getLookUpStatus() {
+    LookUpStatus getLookUpStatus() {
       return lookUpStatus;
     }
-
-    String getLookupStatusInfo() {
-      return lookupStatusInfo;
-    }
   }
 
   /** 
@@ -293,18 +350,14 @@
    */
   private static abstract class TaskSchedulingMgr {
 
-    /** quick way to get qsi object given a queue name */
-    private Map<String, QueueSchedulingInfo> queueInfoMap = 
-      new HashMap<String, QueueSchedulingInfo>();
     /** we keep track of the number of map or reduce slots we saw last */
-    private int numSlots = 0;
-    /** our enclosing TaskScheduler object */
+    private int prevClusterCapacity = 0;
+    /** our TaskScheduler object */
     protected CapacityTaskScheduler scheduler;
-    // for debugging
+    // can be replaced with a global type, if we have one
     protected static enum TYPE {
       MAP, REDUCE
     }
-
     protected TYPE type = null;
 
     abstract Task obtainNewTask(TaskTrackerStatus taskTracker, 
@@ -313,6 +366,7 @@
     abstract int getRunningTasks(JobInProgress job);
     abstract int getPendingTasks(JobInProgress job);
     abstract int killTasksFromJob(JobInProgress job, int tasksToKill);
+    abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
 
     /**
      * List of QSIs for assigning tasks.
@@ -324,35 +378,42 @@
      */  
     private List<QueueSchedulingInfo> qsiForAssigningTasks = 
       new ArrayList<QueueSchedulingInfo>();  
-    /** comparator to sort queues */ 
-    private static final class QueueComparator 
+    /** 
+     * Comparator to sort queues.
+     * For maps, we need to sort on QueueSchedulingInfo.mapTSI. For 
+     * reducers, we use reduceTSI. So we'll need separate comparators.  
+     */ 
+    private static abstract class QueueComparator 
       implements Comparator<QueueSchedulingInfo> {
+      abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
       public int compare(QueueSchedulingInfo q1, QueueSchedulingInfo q2) {
+        TaskSchedulingInfo t1 = getTSI(q1);
+        TaskSchedulingInfo t2 = getTSI(q2);
         // if one queue needs to reclaim something and the other one doesn't, 
         // the former is first
-        if ((0 == q1.reclaimList.size()) && (0 != q2.reclaimList.size())) {
+        if ((0 == t1.reclaimList.size()) && (0 != t2.reclaimList.size())) {
           return 1;
         }
-        else if ((0 != q1.reclaimList.size()) && (0 == q2.reclaimList.size())){
+        else if ((0 != t1.reclaimList.size()) && (0 == t2.reclaimList.size())){
           return -1;
         }
-        else if ((0 == q1.reclaimList.size()) && (0 == q2.reclaimList.size())){
+        else if ((0 == t1.reclaimList.size()) && (0 == t2.reclaimList.size())){
           // neither needs to reclaim. If either doesn't have a capacity yet,
           // it comes at the end of the queue.
-          if ((q1.guaranteedCapacity == 0) &&
-                (q2.guaranteedCapacity != 0)) {
+          if ((t1.guaranteedCapacity == 0) &&
+                (t2.guaranteedCapacity != 0)) {
             return 1;
-          } else if ((q1.guaranteedCapacity != 0) &&
-                      (q2.guaranteedCapacity == 0)) {
+          } else if ((t1.guaranteedCapacity != 0) &&
+                      (t2.guaranteedCapacity == 0)) {
             return -1;
-          } else if ((q1.guaranteedCapacity == 0) &&
-                      (q2.guaranteedCapacity == 0)) {
+          } else if ((t1.guaranteedCapacity == 0) &&
+                      (t2.guaranteedCapacity == 0)) {
             // both don't have capacities, treat them as equal.
             return 0;
           } else {
             // look at how much capacity they've filled
-            double r1 = (double)q1.numRunningTasks/(double)q1.guaranteedCapacity;
-            double r2 = (double)q2.numRunningTasks/(double)q2.guaranteedCapacity;
+            double r1 = (double)t1.numRunningTasks/(double)t1.guaranteedCapacity;
+            double r2 = (double)t2.numRunningTasks/(double)t2.guaranteedCapacity;
             if (r1<r2) return -1;
             else if (r1>r2) return 1;
             else return 0;
@@ -360,32 +421,42 @@
         }
         else {
           // both have to reclaim. Look at which one needs to reclaim earlier
-          long t1 = q1.reclaimList.get(0).whenToKill;
-          long t2 = q2.reclaimList.get(0).whenToKill;
-          if (t1<t2) return -1;
-          else if (t1>t2) return 1;
+          long tm1 = t1.reclaimList.get(0).whenToKill;
+          long tm2 = t2.reclaimList.get(0).whenToKill;
+          if (tm1<tm2) return -1;
+          else if (tm1>tm2) return 1;
           else return 0;
         }
       }
     }
-    private final static QueueComparator queueComparator = new QueueComparator();
-
+    // subclass for map and reduce comparators
+    private static final class MapQueueComparator extends QueueComparator {
+      TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
+        return qsi.mapTSI;
+      }
+    }
+    private static final class ReduceQueueComparator extends QueueComparator {
+      TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
+        return qsi.reduceTSI;
+      }
+    }
+    // these are our comparator instances
+    protected final static MapQueueComparator mapComparator = new MapQueueComparator();
+    protected final static ReduceQueueComparator reduceComparator = new ReduceQueueComparator();
+    // and this is the comparator to use
+    protected QueueComparator queueComparator;
    
     TaskSchedulingMgr(CapacityTaskScheduler sched) {
       scheduler = sched;
     }
     
-    private void add(QueueSchedulingInfo qsi) {
-      queueInfoMap.put(qsi.queueName, qsi);
-      qsiForAssigningTasks.add(qsi);
-    }
-    private int getNumQueues() {
-      return queueInfoMap.size();
-    }
-    private boolean isQueuePresent(String queueName) {
-      return queueInfoMap.containsKey(queueName);
+    // let the scheduling mgr know which queues are in the system
+    void initialize(Map<String, QueueSchedulingInfo> qsiMap) { 
+      // add all the qsi objects to our list and sort
+      qsiForAssigningTasks.addAll(qsiMap.values());
+      Collections.sort(qsiForAssigningTasks, queueComparator);
     }
-
+    
     /** 
      * Periodically, we walk through our queues to do the following: 
      * a. Check if a queue needs to reclaim any resources within a period
@@ -396,10 +467,6 @@
      */
     private synchronized void reclaimCapacity() {
       int tasksToKill = 0;
-      // with only one queue, there's nothing to do
-      if (queueInfoMap.size() < 2) {
-        return;
-      }
       
       // make sure we always get the latest values
       updateQSIObjects();
@@ -407,53 +474,55 @@
       
       QueueSchedulingInfo lastQsi = 
         qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1);
+      TaskSchedulingInfo lastTsi = getTSI(lastQsi);
       long currentTime = scheduler.clock.getTime();
-      for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
-        if (qsi.guaranteedCapacity <= 0) {
+      for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
+        TaskSchedulingInfo tsi = getTSI(qsi);
+        if (tsi.guaranteedCapacity <= 0) {
           // no capacity, hence nothing can be reclaimed.
           continue;
         }
         // is there any resource that needs to be reclaimed? 
-        if ((!qsi.reclaimList.isEmpty()) &&  
-            (qsi.reclaimList.getFirst().whenToKill < 
+        if ((!tsi.reclaimList.isEmpty()) &&  
+            (tsi.reclaimList.getFirst().whenToKill < 
               currentTime + CapacityTaskScheduler.RECLAIM_CAPACITY_INTERVAL)) {
           // make a note of how many tasks to kill to claim resources
-          tasksToKill += qsi.reclaimList.getFirst().currentAmount;
+          tasksToKill += tsi.reclaimList.getFirst().currentAmount;
           // move this to expiry list
-          ReclaimedResource r = qsi.reclaimList.remove();
-          qsi.reclaimExpireList.add(r);
+          ReclaimedResource r = tsi.reclaimList.remove();
+          tsi.reclaimExpireList.add(r);
         }
         // is there any resource that needs to be expired?
-        if ((!qsi.reclaimExpireList.isEmpty()) && 
-            (qsi.reclaimExpireList.getFirst().whenToExpire <= currentTime)) {
-          ReclaimedResource r = qsi.reclaimExpireList.remove();
-          qsi.numReclaimedResources -= r.originalAmount;
+        if ((!tsi.reclaimExpireList.isEmpty()) && 
+            (tsi.reclaimExpireList.getFirst().whenToExpire <= currentTime)) {
+          ReclaimedResource r = tsi.reclaimExpireList.remove();
+          tsi.numReclaimedResources -= r.originalAmount;
         }
         // do we need to reclaim a resource later? 
         // if no queue is over capacity, there's nothing to reclaim
-        if (lastQsi.numRunningTasks <= lastQsi.guaranteedCapacity) {
+        if (lastTsi.numRunningTasks <= lastTsi.guaranteedCapacity) {
           continue;
         }
-        if (qsi.numRunningTasks < qsi.guaranteedCapacity) {
+        if (tsi.numRunningTasks < tsi.guaranteedCapacity) {
           // usedCap is how much capacity is currently accounted for
-          int usedCap = qsi.numRunningTasks + qsi.numReclaimedResources;
+          int usedCap = tsi.numRunningTasks + tsi.numReclaimedResources;
           // see if we have remaining capacity and if we have enough pending 
           // tasks to use up remaining capacity
-          if ((usedCap < qsi.guaranteedCapacity) && 
-              ((qsi.numPendingTasks - qsi.numReclaimedResources)>0)) {
+          if ((usedCap < tsi.guaranteedCapacity) && 
+              ((tsi.numPendingTasks - tsi.numReclaimedResources)>0)) {
             // create a request for resources to be reclaimed
-            int amt = Math.min((qsi.guaranteedCapacity-usedCap), 
-                (qsi.numPendingTasks - qsi.numReclaimedResources));
+            int amt = Math.min((tsi.guaranteedCapacity-usedCap), 
+                (tsi.numPendingTasks - tsi.numReclaimedResources));
             // create a rsource object that needs to be reclaimed some time
             // in the future
             long whenToKill = qsi.reclaimTime - 
               (CapacityTaskScheduler.HEARTBEATS_LEFT_BEFORE_KILLING * 
                   scheduler.taskTrackerManager.getNextHeartbeatInterval());
             if (whenToKill < 0) whenToKill = 0;
-            qsi.reclaimList.add(new ReclaimedResource(amt, 
+            tsi.reclaimList.add(new ReclaimedResource(amt, 
                 currentTime + qsi.reclaimTime, 
                 currentTime + whenToKill));
-            qsi.numReclaimedResources += amt;
+            tsi.numReclaimedResources += amt;
             LOG.debug("Queue " + qsi.queueName + " needs to reclaim " + 
                 amt + " resources");
           }
@@ -484,7 +553,7 @@
       int loc;
       for (loc=0; loc<qsiForAssigningTasks.size(); loc++) {
         QueueSchedulingInfo qsi = qsiForAssigningTasks.get(loc);
-        if (qsi.numRunningTasks > qsi.guaranteedCapacity) {
+        if (getTSI(qsi).numRunningTasks > getTSI(qsi).guaranteedCapacity) {
           // all queues from here onwards are running over cap
           break;
         }
@@ -500,13 +569,15 @@
       int tasksOverCap = 0;
       for (int i=loc; i<qsiForAssigningTasks.size(); i++) {
         QueueSchedulingInfo qsi = qsiForAssigningTasks.get(i);
-        tasksOverCap += (qsi.numRunningTasks - qsi.guaranteedCapacity);
+        tasksOverCap += 
+          (getTSI(qsi).numRunningTasks - getTSI(qsi).guaranteedCapacity);
       }
       // now kill tasks from each queue
       for (int i=loc; i<qsiForAssigningTasks.size(); i++) {
         QueueSchedulingInfo qsi = qsiForAssigningTasks.get(i);
         killTasksFromQueue(qsi, (int)Math.round(
-            ((double)(qsi.numRunningTasks - qsi.guaranteedCapacity))*
+            ((double)(getTSI(qsi).numRunningTasks - 
+                getTSI(qsi).guaranteedCapacity))*
             tasksToKill/(double)tasksOverCap));
       }
     }
@@ -561,18 +632,18 @@
     private synchronized void updateQSIObjects() {
       // if # of slots have changed since last time, update. 
       // First, compute whether the total number of TT slots have changed
-      int slotsDiff = getClusterCapacity()- numSlots;
-      numSlots += slotsDiff;
-      for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
+      int currentClusterCapacity = getClusterCapacity();
+      for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
+        TaskSchedulingInfo tsi = getTSI(qsi);
         // compute new GCs and ACs, if TT slots have changed
-        if (slotsDiff != 0) {
-          qsi.guaranteedCapacity =
-            (int)(qsi.guaranteedCapacityPercent*numSlots/100);
-        }
-        qsi.numRunningTasks = 0;
-        qsi.numPendingTasks = 0;
-        for (String s: qsi.numRunningTasksByUser.keySet()) {
-          qsi.numRunningTasksByUser.put(s, 0);
+        if (currentClusterCapacity != prevClusterCapacity) {
+          tsi.guaranteedCapacity =
+            (int)(qsi.guaranteedCapacityPercent*currentClusterCapacity/100);
+        }
+        tsi.numRunningTasks = 0;
+        tsi.numPendingTasks = 0;
+        for (String s: tsi.numRunningTasksByUser.keySet()) {
+          tsi.numRunningTasksByUser.put(s, 0);
         }
         // update stats on running jobs
         for (JobInProgress j: 
@@ -580,11 +651,11 @@
           if (j.getStatus().getRunState() != JobStatus.RUNNING) {
             continue;
           }
-          qsi.numRunningTasks += getRunningTasks(j);
-          Integer i = qsi.numRunningTasksByUser.get(j.getProfile().getUser());
-          qsi.numRunningTasksByUser.put(j.getProfile().getUser(), 
+          tsi.numRunningTasks += getRunningTasks(j);
+          Integer i = tsi.numRunningTasksByUser.get(j.getProfile().getUser());
+          tsi.numRunningTasksByUser.put(j.getProfile().getUser(), 
               i+getRunningTasks(j));
-          qsi.numPendingTasks += getPendingTasks(j);
+          tsi.numPendingTasks += getPendingTasks(j);
           LOG.debug("updateQSI: job " + j.getJobID().toString() + ": run(m) = " +
               j.runningMaps() + ", run(r) = " + j.runningReduces() + 
               ", finished(m) = " + j.finishedMaps() + ", finished(r)= " + 
@@ -606,7 +677,7 @@
         for(JobInProgress j : 
           scheduler.jobQueuesManager.getJobs(qsi.queueName)) {
           // pending tasks
-          if(qsi.numPendingTasks > getClusterCapacity()) {
+          if(tsi.numPendingTasks > currentClusterCapacity) {
             // that's plenty. no need for more computation
             break;
           }
@@ -621,67 +692,27 @@
            * poller to walk through the job queue to clean up killed jobs.
            */
           if (j.getStatus().getRunState() == JobStatus.PREP) {
-            qsi.numPendingTasks += getPendingTasks(j);
+            tsi.numPendingTasks += getPendingTasks(j);
           }
         }
       }
-    }
-
-    void jobAdded(JobInProgress job) {
-      // update qsi 
-      QueueSchedulingInfo qsi = 
-        queueInfoMap.get(job.getProfile().getQueueName());
-      // qsi shouldn't be null
-      
-      // update user-specific info
-      Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
-      if (null == i) {
-        i = 1;
-        qsi.numRunningTasksByUser.put(job.getProfile().getUser(), 0);
-      }
-      else {
-        i++;
-      }
-      qsi.numJobsByUser.put(job.getProfile().getUser(), i);
-      LOG.debug("Job " + job.getJobID().toString() + " is added under user " 
-                + job.getProfile().getUser() + ", user now has " + i + " jobs");
-    }
-
-    void jobRemoved(JobInProgress job) {
-      // update qsi 
-      QueueSchedulingInfo qsi = 
-        queueInfoMap.get(job.getProfile().getQueueName());
-      // qsi shouldn't be null
-      
-      // update numJobsByUser
-      LOG.debug("JOb to be removed for user " + job.getProfile().getUser());
-      Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
-      i--;
-      if (0 == i.intValue()) {
-        qsi.numJobsByUser.remove(job.getProfile().getUser());
-        qsi.numRunningTasksByUser.remove(job.getProfile().getUser());
-        LOG.debug("No more jobs for user, number of users = " + qsi.numJobsByUser.size());
-      }
-      else {
-        qsi.numJobsByUser.put(job.getProfile().getUser(), i);
-        LOG.debug("User still has " + i + " jobs, number of users = "
-                  + qsi.numJobsByUser.size());
-      }
+      prevClusterCapacity = currentClusterCapacity;
     }
 
     // called when a task is allocated to queue represented by qsi. 
     // update our info about reclaimed resources
     private synchronized void updateReclaimedResources(QueueSchedulingInfo qsi) {
+      TaskSchedulingInfo tsi = getTSI(qsi);
       // if we needed to reclaim resources, we have reclaimed one
-      if (qsi.reclaimList.isEmpty()) {
+      if (tsi.reclaimList.isEmpty()) {
         return;
       }
-      ReclaimedResource res = qsi.reclaimList.getFirst();
+      ReclaimedResource res = tsi.reclaimList.getFirst();
       res.currentAmount--;
       if (0 == res.currentAmount) {
         // move this resource to the expiry list
-        ReclaimedResource r = qsi.reclaimList.remove();
-        qsi.reclaimExpireList.add(r);
+        ReclaimedResource r = tsi.reclaimList.remove();
+        tsi.reclaimExpireList.add(r);
       }
     }
 
@@ -690,26 +721,24 @@
     }
 
 
-    private boolean isUserOverLimit(JobInProgress j, QueueSchedulingInfo qsi) {
+    private boolean isUserOverLimit(String user, QueueSchedulingInfo qsi) {
       // what is our current capacity? It's GC if we're running below GC. 
       // If we're running over GC, then its #running plus 1 (which is the 
       // extra slot we're getting). 
       int currentCapacity;
-      if (qsi.numRunningTasks < qsi.guaranteedCapacity) {
-        currentCapacity = qsi.guaranteedCapacity;
+      TaskSchedulingInfo tsi = getTSI(qsi);
+      if (tsi.numRunningTasks < tsi.guaranteedCapacity) {
+        currentCapacity = tsi.guaranteedCapacity;
       }
       else {
-        currentCapacity = qsi.numRunningTasks+1;
+        currentCapacity = tsi.numRunningTasks+1;
       }
       int limit = Math.max((int)(Math.ceil((double)currentCapacity/
           (double)qsi.numJobsByUser.size())), 
           (int)(Math.ceil((double)(qsi.ulMin*currentCapacity)/100.0)));
-      if (qsi.numRunningTasksByUser.get(
-          j.getProfile().getUser()) >= limit) {
-        LOG.debug("User " + j.getProfile().getUser() + 
-            " is over limit, num running tasks = " + 
-            qsi.numRunningTasksByUser.get(j.getProfile().getUser()) + 
-            ", limit = " + limit);
+      if (tsi.numRunningTasksByUser.get(user) >= limit) {
+        LOG.debug("User " + user + " is over limit, num running tasks = " + 
+            tsi.numRunningTasksByUser.get(user) + ", limit = " + limit);
         return true;
       }
       else {
@@ -717,136 +746,99 @@
       }
     }
 
+    /*
+     * This is the central scheduling method. 
+     * It tries to get a task from jobs in a single queue. 
+     * Always return a TaskLookupResult object. Don't return null. 
+     */
     private TaskLookupResult getTaskFromQueue(TaskTrackerStatus taskTracker,
         QueueSchedulingInfo qsi)
         throws IOException {
 
-      // keep track of users over limit
-      Set<String> usersOverLimit = new HashSet<String>();
-
-      // Look at running jobs first, skipping jobs of those users who are over
-      // their limits
-      TaskLookupResult result =
-          getTaskFromRunningJobQueue(taskTracker, qsi, usersOverLimit, true);
-      TaskLookUpStatus lookUpStatus = result.getLookUpStatus();
-      if (lookUpStatus == TaskLookUpStatus.TASK_FOUND
-          || lookUpStatus == TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS) {
-        // No need for looking elsewhere
-        return result;
-      }
-
-      // if we're here, we haven't found anything. This could be because
-      // there is nothing to run, or that the user limit for some user is
-      // too strict, i.e., there's at least one user who doesn't have
-      // enough tasks to satisfy his limit. If it's the later case, look at
-      // jobs without considering user limits, and get task from first
-      // eligible job
-      if (usersOverLimit.size() > 0) {
-        // look at running jobs, considering users over limit
-        result =
-            getTaskFromRunningJobQueue(taskTracker, qsi, usersOverLimit, false);
-        lookUpStatus = result.getLookUpStatus();
-        if (lookUpStatus == TaskLookUpStatus.TASK_FOUND
-            || lookUpStatus == TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS) {
-          // No need for looking elsewhere
-          return result;
-        }
-      }
-
-      // found nothing for this queue, look at the next one.
-      String msg = "Found no task from the queue" + qsi.queueName;
-      LOG.info(msg);
-      return new TaskLookupResult(null, TaskLookUpStatus.NO_TASK_IN_QUEUE,
-          msg);
-    }
-
-    // get a task from the running queue
-    private TaskLookupResult getTaskFromRunningJobQueue(
-        TaskTrackerStatus taskTracker, QueueSchedulingInfo qsi,
-        Set<String> usersOverLimit, boolean skipUsersOverLimit)
-        throws IOException {
+      // we only look at jobs in the running queues, as these are the ones
+      // who have been potentially initialized
 
-      for (JobInProgress j : scheduler.jobQueuesManager
-          .getRunningJobQueue(qsi.queueName)) {
-        // some jobs may be in the running queue but may have completed
-        // and not yet have been removed from the running queue
+      for (JobInProgress j : 
+        scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
+        // only look at jobs that can be run. We ignore jobs that haven't 
+        // initialized, or have completed but haven't been removed from the 
+        // running queue. 
         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
           continue;
         }
-
-        if (skipUsersOverLimit) {
-          // consider jobs of only those users who are under limits
-          if (isUserOverLimit(j, qsi)) {
-            usersOverLimit.add(j.getProfile().getUser());
-            continue;
+        // check if the job's user is over limit
+        if (isUserOverLimit(j.getProfile().getUser(), qsi)) {
+          continue;
+        }
+        if (getPendingTasks(j) != 0) {
+          // Not accurate TODO:
+          // check if the job's memory requirements are met
+          if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
+            // We found a suitable job. Get task from it.
+            Task t = obtainNewTask(taskTracker, j);
+            if (t != null) {
+              // we're successful in getting a task
+              return TaskLookupResult.getTaskFoundResult(t);
+            }
           }
-        } else {
-          // consider jobs of only those users who are over limit
-          if (!usersOverLimit.contains(j.getProfile().getUser())) {
-            continue;
+          else {
+            // mem requirements not met. Rather than look at the next job, 
+            // we return nothing to the TT, with the hope that we improve 
+            // chances of finding a suitable TT for this job. This lets us
+            // avoid starving jobs with high mem requirements.         
+            return TaskLookupResult.getMemFailedResult();
           }
         }
+        // if we're here, this job has no task to run. Look at the next job.
+      }
 
-        // We found a suitable job. Try getting a task from it.
-        TaskLookupResult tlr = getTaskFromJob(j, taskTracker, qsi);
-        TaskLookUpStatus lookUpStatus = tlr.getLookUpStatus();
-        if (lookUpStatus == TaskLookUpStatus.NO_TASK_IN_JOB) {
-          // Go to the next job in the same queue.
+      // if we're here, we haven't found any task to run among all jobs in 
+      // the queue. This could be because there is nothing to run, or that 
+      // the user limit for some user is too strict, i.e., there's at least 
+      // one user who doesn't have enough tasks to satisfy his limit. If 
+      // it's the latter case, re-look at jobs without considering user 
+      // limits, and get a task from the first eligible job
+      // Note: some of the code from above is repeated here. This is on 
+      // purpose as it improves overall readability.  
+      // Note: we walk through jobs again. Some of these jobs, which weren't
+      // considered in the first pass, shouldn't be considered here again, 
+      // but we still check for their viability to keep the code simple. In
+      // some cases, for high mem jobs that have nothing to run, we call 
+      // obtainNewTask() unnecessarily. Should this be a problem, we can 
+      // create a list of jobs to look at (those whose users were over 
+      // limit) in the first pass and walk through that list only. 
+      for (JobInProgress j : 
+        scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
+        if (j.getStatus().getRunState() != JobStatus.RUNNING) {
           continue;
-        } else if (lookUpStatus == TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS
-            || lookUpStatus == TaskLookUpStatus.TASK_FOUND) {
-          // No need for considering the next jobs in this queue.
-          return tlr;
         }
-      }
-
-      String msg =
-          qsi.queueName + " queue's running jobs queue don't have "
-              + "any more tasks to run.";
-      LOG.info(msg);
-      return new TaskLookupResult(null,
-          TaskLookUpStatus.NO_TASK_IN_QUEUE, msg);
-    }
-
-    private TaskLookupResult getTaskFromJob(JobInProgress j,
-        TaskTrackerStatus taskTracker, QueueSchedulingInfo qsi)
-        throws IOException {
-      String msg;
-
-      if (getPendingTasks(j) != 0) {
-        // Not accurate TODO:
-        if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
-          // We found a suitable job. Get task from it.
-          Task t = obtainNewTask(taskTracker, j);
-          if (t != null) {
-            msg =
-                "Got task from job " + j.getJobID() + " in queue "
-                    + qsi.queueName;
-            LOG.debug(msg);
-            return new TaskLookupResult(t, TaskLookUpStatus.TASK_FOUND, msg);
+        if (getPendingTasks(j) != 0) {
+          // Not accurate TODO:
+          // check if the job's memory requirements are met
+          if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
+            // We found a suitable job. Get task from it.
+            Task t = obtainNewTask(taskTracker, j);
+            if (t != null) {
+              // we're successful in getting a task
+              return TaskLookupResult.getTaskFoundResult(t);
+            }
+          }
+          else {
+            // mem requirements not met. 
+            return TaskLookupResult.getMemFailedResult();
           }
-        } else {
-          // block the cluster, till this job's tasks can be scheduled.
-          msg =
-              j.getJobID() + "'s tasks don't fit on the TaskTracker "
-                  + taskTracker.trackerName
-                  + ". Returning no task to the taskTracker";
-          LOG.info(msg);
-          return new TaskLookupResult(null,
-              TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS, msg);
         }
+        // if we're here, this job has no task to run. Look at the next job.
       }
 
-      msg = j.getJobID() + " doesn't have any more tasks to run.";
+      // found nothing for this queue, look at the next one.
+      String msg = "Found no task from the queue " + qsi.queueName;
       LOG.debug(msg);
-      return new TaskLookupResult(null,
-          TaskLookUpStatus.NO_TASK_IN_JOB, msg);
+      return TaskLookupResult.getNoTaskFoundResult();
     }
 
-    // don't return null
+    // Always return a TaskLookupResult object. Don't return null. 
     private TaskLookupResult assignTasks(TaskTrackerStatus taskTracker) throws IOException {
-      Task t = null;
-
       /* 
        * update all our QSI objects.
        * This involves updating each qsi structure. This operation depends
@@ -863,62 +855,51 @@
        */
       updateCollectionOfQSIs();
       for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
-        if (qsi.guaranteedCapacity <= 0.0f) {
+        if (getTSI(qsi).guaranteedCapacity <= 0.0f) {
           // No capacity is guaranteed yet for this queue.
           // Queues are sorted so that ones without capacities
           // come towards the end. Hence, we can simply return
           // from here without considering any further queues.
-          return new TaskLookupResult(null, TaskLookUpStatus.NO_TASK_IN_QUEUE,
-              null);
+          return TaskLookupResult.getNoTaskFoundResult();
         }
-
         TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
-        TaskLookUpStatus lookUpStatus = tlr.getLookUpStatus();
+        TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();
 
-        if (lookUpStatus == TaskLookUpStatus.NO_TASK_IN_QUEUE) {
+        if (lookUpStatus == TaskLookupResult.LookUpStatus.NO_TASK_FOUND) {
           continue; // Look in other queues.
         }
 
-        if (lookUpStatus == TaskLookUpStatus.TASK_FOUND) {
+        // if we find a task, return
+        if (lookUpStatus == TaskLookupResult.LookUpStatus.TASK_FOUND) {
           // we have a task. Update reclaimed resource info
           updateReclaimedResources(qsi);
           return tlr;
         }
-        
-        if (lookUpStatus == TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS) {
-          // blocking the cluster.
-          String msg = tlr.getLookupStatusInfo();
-          if (msg != null) {
-            LOG.warn(msg);
-            LOG.warn("Returning nothing to the Tasktracker "
-                + taskTracker.trackerName);
+        // if there was a memory mismatch, return
+        else if (lookUpStatus == 
+          TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT) {
             return tlr;
-          }
         }
       }
 
       // nothing to give
-      return new TaskLookupResult(null, TaskLookUpStatus.NO_TASK_IN_QUEUE,
-          null);
+      return TaskLookupResult.getNoTaskFoundResult();
     }
     
     private void printQSIs() {
       StringBuffer s = new StringBuffer();
       for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
+        TaskSchedulingInfo tsi = getTSI(qsi);
         Collection<JobInProgress> runJobs = 
           scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
         s.append(" Queue '" + qsi.queueName + "'(" + this.type + "): run=" + 
-            qsi.numRunningTasks + ", gc=" + qsi.guaranteedCapacity + 
-            ", wait=" + qsi.numPendingTasks + ", run jobs="+ runJobs.size() + 
+            tsi.numRunningTasks + ", gc=" + tsi.guaranteedCapacity + 
+            ", wait=" + tsi.numPendingTasks + ", run jobs="+ runJobs.size() + 
             "*** ");
       }
       LOG.debug(s);
     }
     
-    public QueueSchedulingInfo getQueueSchedulingInfo(String queueName) {
-      return this.queueInfoMap.get(queueName) ;
-    }
-
   }
 
   /**
@@ -928,6 +909,7 @@
     MapSchedulingMgr(CapacityTaskScheduler dad) {
       super(dad);
       type = TaskSchedulingMgr.TYPE.MAP;
+      queueComparator = mapComparator;
     }
     Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
     throws IOException {
@@ -995,6 +977,9 @@
       }
       return tasksKilled;
     }
+    TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
+      return qsi.mapTSI;
+    }
 
   }
 
@@ -1005,6 +990,7 @@
     ReduceSchedulingMgr(CapacityTaskScheduler dad) {
       super(dad);
       type = TaskSchedulingMgr.TYPE.REDUCE;
+      queueComparator = reduceComparator;
     }
     Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
     throws IOException {
@@ -1047,6 +1033,9 @@
       }
       return tasksKilled;
     }
+    TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
+      return qsi.reduceTSI;
+    }
   }
   
   /** the scheduling mgrs for Map and Reduce tasks */ 
@@ -1066,7 +1055,7 @@
 
   static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
   protected JobQueuesManager jobQueuesManager;
-  protected CapacitySchedulerConf rmConf;
+  protected CapacitySchedulerConf schedConf;
   /** whether scheduler has started or not */
   private boolean started = false;
   
@@ -1125,7 +1114,7 @@
   
   /** mostly for testing purposes */
   public void setResourceManagerConf(CapacitySchedulerConf conf) {
-    this.rmConf = conf;
+    this.schedConf = conf;
   }
 
   /**
@@ -1148,14 +1137,14 @@
             JobConf.DISABLED_MEMORY_LIMIT));
 
     limitMaxPmemForTasks =
-        normalizeMemoryConfigValue(rmConf.getLimitMaxPmemForTasks());
+        normalizeMemoryConfigValue(schedConf.getLimitMaxPmemForTasks());
 
     defaultMaxVmPerTask =
         normalizeMemoryConfigValue(conf.getLong(
             JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
             JobConf.DISABLED_MEMORY_LIMIT));
 
-    defaultPercentOfPmemInVmem = rmConf.getDefaultPercentOfPmemInVmem();
+    defaultPercentOfPmemInVmem = schedConf.getDefaultPercentOfPmemInVmem();
     if (defaultPercentOfPmemInVmem < 0) {
       defaultPercentOfPmemInVmem = JobConf.DISABLED_MEMORY_LIMIT;
     }
@@ -1166,45 +1155,45 @@
     if (started) return;
     super.start();
     // initialize our queues from the config settings
-    if (null == rmConf) {
-      rmConf = new CapacitySchedulerConf();
+    if (null == schedConf) {
+      schedConf = new CapacitySchedulerConf();
     }
 
     initializeMemoryRelatedConf();
     
-    RECLAIM_CAPACITY_INTERVAL = rmConf.getReclaimCapacityInterval();
+    RECLAIM_CAPACITY_INTERVAL = schedConf.getReclaimCapacityInterval();
     RECLAIM_CAPACITY_INTERVAL *= 1000;
 
     // read queue info from config file
     QueueManager queueManager = taskTrackerManager.getQueueManager();
     Set<String> queues = queueManager.getQueues();
+    // Sanity check: there should be at least one queue. 
+    if (0 == queues.size()) {
+      throw new IllegalStateException("System has no queue configured");
+    }
+
     Set<String> queuesWithoutConfiguredGC = new HashSet<String>();
     float totalCapacity = 0.0f;
     for (String queueName: queues) {
-      float gc = rmConf.getGuaranteedCapacity(queueName); 
+      float gc = schedConf.getGuaranteedCapacity(queueName); 
       if(gc == -1.0) {
         queuesWithoutConfiguredGC.add(queueName);
       }else {
         totalCapacity += gc;
       }
-      int ulMin = rmConf.getMinimumUserLimitPercent(queueName); 
-      long reclaimTimeLimit = rmConf.getReclaimTimeLimit(queueName) * 1000;
-      // reclaimTimeLimit is the time(in millisec) within which we need to
-      // reclaim capacity. 
-      // create queue scheduling objects for Map and Reduce
-      mapScheduler.add(new QueueSchedulingInfo(queueName, gc, 
-          ulMin, reclaimTimeLimit));
-      reduceScheduler.add(new QueueSchedulingInfo(queueName, gc, 
-          ulMin, reclaimTimeLimit));
+      int ulMin = schedConf.getMinimumUserLimitPercent(queueName); 
+      long reclaimTimeLimit = schedConf.getReclaimTimeLimit(queueName) * 1000;
+      // create our QSI and add to our hashmap
+      QueueSchedulingInfo qsi = new QueueSchedulingInfo(queueName, gc, 
+          ulMin, reclaimTimeLimit, jobQueuesManager);
+      queueInfoMap.put(queueName, qsi);
 
       // create the queues of job objects
-      boolean supportsPrio = rmConf.isPrioritySupported(queueName);
+      boolean supportsPrio = schedConf.isPrioritySupported(queueName);
       jobQueuesManager.createQueue(queueName, supportsPrio);
       
-      SchedulingInfo schedulingInfo = new SchedulingInfo(
-          mapScheduler.getQueueSchedulingInfo(queueName),
-          reduceScheduler.getQueueSchedulingInfo(queueName),supportsPrio,
-          jobQueuesManager,rmConf.getSleepInterval());
+      SchedulingDisplayInfo schedulingInfo = 
+        new SchedulingDisplayInfo(queueName, this);
       queueManager.setSchedulerInfo(queueName, schedulingInfo);
       
     }
@@ -1212,26 +1201,23 @@
     float quantityToAllocate = 
       remainingQuantityToAllocate/queuesWithoutConfiguredGC.size();
     for(String queue: queuesWithoutConfiguredGC) {
-      QueueSchedulingInfo schedulingInfo = 
-        mapScheduler.getQueueSchedulingInfo(queue);
-      schedulingInfo.guaranteedCapacityPercent = quantityToAllocate;
-      schedulingInfo = reduceScheduler.getQueueSchedulingInfo(queue);
-      schedulingInfo.guaranteedCapacityPercent = quantityToAllocate;
-      rmConf.setGuaranteedCapacity(queue, quantityToAllocate);
+      QueueSchedulingInfo qsi = queueInfoMap.get(queue); 
+      qsi.guaranteedCapacityPercent = quantityToAllocate;
+      schedConf.setGuaranteedCapacity(queue, quantityToAllocate);
+    }    
+    
+    // check if there's a queue with the default name. If not, we quit.
+    if (!queueInfoMap.containsKey(DEFAULT_QUEUE_NAME)) {
+      throw new IllegalStateException("System has no default queue configured");
     }
     if (totalCapacity > 100.0) {
       throw new IllegalArgumentException("Sum of queue capacities over 100% at "
                                          + totalCapacity);
     }    
-    // Sanity check: there should be at least one queue. 
-    if (0 == mapScheduler.getNumQueues()) {
-      throw new IllegalStateException("System has no queue configured");
-    }
     
-    // check if there's a queue with the default name. If not, we quit.
-    if (!mapScheduler.isQueuePresent(DEFAULT_QUEUE_NAME)) {
-      throw new IllegalStateException("System has no default queue configured");
-    }
+    // let our mgr objects know about the queues
+    mapScheduler.initialize(queueInfoMap);
+    reduceScheduler.initialize(queueInfoMap);
     
     // listen to job changes
     taskTrackerManager.addJobInProgressListener(jobQueuesManager);
@@ -1239,18 +1225,25 @@
     //Start thread for initialization
     if (initializationPoller == null) {
       this.initializationPoller = new JobInitializationPoller(
-          jobQueuesManager,rmConf,queues);
+          jobQueuesManager,schedConf,queues);
     }
-    initializationPoller.init(queueManager.getQueues(), rmConf);
+    initializationPoller.init(queueManager.getQueues(), schedConf);
     initializationPoller.setDaemon(true);
     initializationPoller.start();
-    // start thread for redistributing capacity
-    this.reclaimCapacityThread = 
-      new Thread(new ReclaimCapacity(),"reclaimCapacity");
-    this.reclaimCapacityThread.start();
+
+    // start thread for redistributing capacity if we have more than 
+    // one queue
+    if (queueInfoMap.size() > 1) {
+      this.reclaimCapacityThread = 
+        new Thread(new ReclaimCapacity(),"reclaimCapacity");
+      this.reclaimCapacityThread.start();
+    }
+    else {
+      LOG.info("Only one queue present. Reclaim capacity thread not started.");
+    }
+    
     started = true;
-    LOG.info("Capacity scheduler initialized " + queues.size() + " queues");
-  }
+    LOG.info("Capacity scheduler initialized " + queues.size() + " queues");  }
   
   /** mostly for testing purposes */
   void setInitializationPoller(JobInitializationPoller p) {
@@ -1326,26 +1319,27 @@
     int currentMapTasks = taskTracker.countMapTasks();
     int maxReduceTasks = taskTracker.getMaxReduceTasks();
     int currentReduceTasks = taskTracker.countReduceTasks();
-    
+
     if ((maxReduceTasks - currentReduceTasks) > 
     (maxMapTasks - currentMapTasks)) {
       // get a reduce task first
       tlr = reduceScheduler.assignTasks(taskTracker);
-      if (TaskLookUpStatus.TASK_FOUND == 
+      if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
         tlr.getLookUpStatus()) {
         // found a task; return
         return Collections.singletonList(tlr.getTask());
       }
-      else if (TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS == 
+      else if (TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT == 
         tlr.getLookUpStatus()) {
         // return no task
         return null;
       }
       // if we didn't get any, look at map tasks, if TT has space
-      else if ((TaskLookUpStatus.NO_TASK_IN_QUEUE == 
+      else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND == 
         tlr.getLookUpStatus()) && (maxMapTasks > currentMapTasks)) {
         tlr = mapScheduler.assignTasks(taskTracker);
-        if (TaskLookUpStatus.TASK_FOUND == tlr.getLookUpStatus()) {
+        if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
+          tlr.getLookUpStatus()) {
           return Collections.singletonList(tlr.getTask());
         }
       }
@@ -1353,21 +1347,21 @@
     else {
       // get a map task first
       tlr = mapScheduler.assignTasks(taskTracker);
-      if (TaskLookUpStatus.TASK_FOUND == 
+      if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
         tlr.getLookUpStatus()) {
         // found a task; return
         return Collections.singletonList(tlr.getTask());
       }
-      else if (TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS == 
+      else if (TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT == 
         tlr.getLookUpStatus()) {
-        // return no task
         return null;
       }
       // if we didn't get any, look at reduce tasks, if TT has space
-      else if ((TaskLookUpStatus.NO_TASK_IN_QUEUE == 
+      else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND == 
         tlr.getLookUpStatus()) && (maxReduceTasks > currentReduceTasks)) {
         tlr = reduceScheduler.assignTasks(taskTracker);
-        if (TaskLookUpStatus.TASK_FOUND == tlr.getLookUpStatus()) {
+        if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
+          tlr.getLookUpStatus()) {
           return Collections.singletonList(tlr.getTask());
         }
       }
@@ -1410,10 +1404,23 @@
 
   // called when a job is added
   synchronized void jobAdded(JobInProgress job) throws IOException {
-    // let our map and reduce schedulers know this, so they can update 
-    // user-specific info
-    mapScheduler.jobAdded(job);
-    reduceScheduler.jobAdded(job);
+    QueueSchedulingInfo qsi = 
+      queueInfoMap.get(job.getProfile().getQueueName());
+    // qsi shouldn't be null
+    // update user-specific info
+    Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
+    if (null == i) {
+      i = 1;
+      // set the count for running tasks to 0
+      qsi.mapTSI.numRunningTasksByUser.put(job.getProfile().getUser(), 0);
+      qsi.reduceTSI.numRunningTasksByUser.put(job.getProfile().getUser(), 0);
+    }
+    else {
+      i++;
+    }
+    qsi.numJobsByUser.put(job.getProfile().getUser(), i);
+    LOG.debug("Job " + job.getJobID().toString() + " is added under user " 
+              + job.getProfile().getUser() + ", user now has " + i + " jobs");
 
     // Kill the job if it cannot run in the cluster because of invalid
     // resource requirements.
@@ -1425,10 +1432,25 @@
 
   // called when a job completes
   synchronized void jobCompleted(JobInProgress job) {
-    // let our map and reduce schedulers know this, so they can update 
-    // user-specific info
-    mapScheduler.jobRemoved(job);
-    reduceScheduler.jobRemoved(job);
+    QueueSchedulingInfo qsi = 
+      queueInfoMap.get(job.getProfile().getQueueName());
+    // qsi shouldn't be null
+    // update numJobsByUser
+    LOG.debug("JOb to be removed for user " + job.getProfile().getUser());
+    Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
+    i--;
+    if (0 == i.intValue()) {
+      qsi.numJobsByUser.remove(job.getProfile().getUser());
+      // remove job footprint from our TSIs
+      qsi.mapTSI.numRunningTasksByUser.remove(job.getProfile().getUser());
+      qsi.reduceTSI.numRunningTasksByUser.remove(job.getProfile().getUser());
+      LOG.debug("No more jobs for user, number of users = " + qsi.numJobsByUser.size());
+    }
+    else {
+      qsi.numJobsByUser.put(job.getProfile().getUser(), i);
+      LOG.debug("User still has " + i + " jobs, number of users = "
+                + qsi.numJobsByUser.size());
+    }
   }
   
   @Override
@@ -1455,5 +1477,14 @@
   JobInitializationPoller getInitializationPoller() {
     return initializationPoller;
   }
+
+  synchronized String getDisplayInfo(String queueName) {
+    QueueSchedulingInfo qsi = queueInfoMap.get(queueName);
+    if (null == qsi) { 
+      return null;
+    }
+    return qsi.toString();
+  }
+
 }
 

Modified: hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=732608&r1=732607&r2=732608&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (original)
+++ hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Wed Jan  7 21:04:08 2009
@@ -278,4 +278,8 @@
     QueueInfo qi = jobQueues.get(queue);
     return qi.getWaitingJobCount();
   }
+
+  boolean doesQueueSupportPriorities(String queueName) {
+    return jobQueues.get(queueName).supportsPriorities;
+  }
 }

Modified: hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=732608&r1=732607&r2=732608&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Wed Jan  7 21:04:08 2009
@@ -985,8 +985,8 @@
                                           String queue) throws IOException {
     String schedInfo = taskTrackerManager.getQueueManager().
                           getSchedulerInfo(queue).toString();
-    assertTrue(schedInfo.contains("Guaranteed Capacity Maps : " 
-                                    + expectedCapacity));
+    assertTrue(schedInfo.contains("Map tasks\nGuaranteed Capacity: " 
+        + expectedCapacity));
   }
   
   // test capacity transfer
@@ -1301,7 +1301,7 @@
     
     scheduler.reclaimCapacity();
     
-    clock.advance(scheduler.rmConf.getReclaimTimeLimit("default") * 1000);
+    clock.advance(scheduler.schedConf.getReclaimTimeLimit("default") * 1000);
     
     scheduler.reclaimCapacity();
     
@@ -1475,19 +1475,18 @@
     String schedulingInfo2 = queueManager.getJobQueueInfo("q2").getSchedulingInfo();
     String[] infoStrings = schedulingInfo.split("\n");
     
-    assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[0] , "Guaranteed Capacity : 50.0 %");
-    assertEquals(infoStrings[1] , "Guaranteed Capacity Maps : " + totalMaps * 50/100 + " ");
-    assertEquals(infoStrings[2] , "Guaranteed Capacity Reduces : " + totalReduces * 50/100 + " ");
-    assertEquals(infoStrings[3] , "User Limit : 25 %");
-    assertEquals(infoStrings[4] , "Reclaim Time limit : " + 
-        StringUtils.formatTime(1000000) + " ");
-    assertEquals(infoStrings[5] , "Priority Supported : YES ");
-    assertEquals(infoStrings[7] , 
-        "Running Maps : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] , 
-        "Running Reduces : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 0 ");
+    assertEquals(infoStrings.length, 17);
+    assertEquals(infoStrings[1] , "Guaranteed Capacity Percentage: 50.0%");
+    assertEquals(infoStrings[7] , "Guaranteed Capacity: " + totalMaps * 50/100);
+    assertEquals(infoStrings[11] , "Guaranteed Capacity: " + totalReduces * 50/100);
+    assertEquals(infoStrings[2] , "User Limit: 25%");
+    assertEquals(infoStrings[3] , "Reclaim Time limit: " + 
+        StringUtils.formatTime(1000000));
+    assertEquals(infoStrings[4] , "Priority Supported: YES");
+    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 0");
+    assertEquals(infoStrings[16] , "Number of users who have submitted jobs: 0");
     assertEquals(schedulingInfo, schedulingInfo2);
     
     //Testing with actual job submission.
@@ -1498,12 +1497,10 @@
     infoStrings = schedulingInfo.split("\n");
     
     //waiting job should be equal to number of jobs submitted.
-    assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[7] ,
-        "Running Maps : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] , 
-        "Running Reduces : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 5 ");
+    assertEquals(infoStrings.length, 17);
+    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 5");
     
     //Initalize the jobs but don't raise events
     p.selectJobsToInitialize();
@@ -1511,14 +1508,12 @@
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 10);
+    assertEquals(infoStrings.length, 17);
     //should be previous value as nothing is scheduled because no events
     //has been raised after initialization.
-    assertEquals(infoStrings[7] ,
-        "Running Maps : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] , 
-        "Running Reduces : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 5 ");
+    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 5");
     
     //Raise status change event so that jobs can move to running queue.
     raiseStatusChangeEvents(scheduler.jobQueuesManager);
@@ -1529,28 +1524,27 @@
     
     //Get scheduling information, now the number of waiting job should have
     //changed to 4 as one is scheduled and has become running.
+    // make sure we update our stats
+    scheduler.updateQSIInfo();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[7] , 
-        "Running Maps : 100.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] , 
-        "Running Reduces : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 4 ");
+    assertEquals(infoStrings.length, 19);
+    assertEquals(infoStrings[8], "Running tasks: 100.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[14],"Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[17] , "Number of Waiting Jobs: 4");
     
     //assign a reduce task
-    
     Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    // make sure we update our stats
+    scheduler.updateQSIInfo();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[7] , 
-        "Running Maps : 100.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] , 
-        "Running Reduces : 100.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 4 ");
+    assertEquals(infoStrings.length, 21);
+    assertEquals(infoStrings[8], "Running tasks: 100.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[14],"Running tasks: 100.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[19] , "Number of Waiting Jobs: 4");
     
     //Complete the job and check the running tasks count
     FakeJobInProgress u1j1 = userJobs.get(0);
@@ -1558,16 +1552,15 @@
     taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1j1);
     taskTrackerManager.finalizeJob(u1j1);
     
+    // make sure we update our stats
+    scheduler.updateQSIInfo();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[7] , 
-        "Running Maps : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] ,
-        "Running Reduces : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 4 ");
-    
+    assertEquals(infoStrings.length, 17);
+    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 4");
     
     //Fail a job which is initialized but not scheduled and check the count.
     FakeJobInProgress u1j2 = userJobs.get(1);
@@ -1576,15 +1569,15 @@
     taskTrackerManager.finalizeJob(u1j2, JobStatus.FAILED);
     //Run initializer to clean up failed jobs
     p.selectJobsToInitialize();
+    // make sure we update our stats
+    scheduler.updateQSIInfo();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[7] , 
-        "Running Maps : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] ,
-        "Running Reduces : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 3 ");
+    assertEquals(infoStrings.length, 17);
+    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 3");
     
     //Fail a job which is not initialized but is in the waiting queue.
     FakeJobInProgress u1j5 = userJobs.get(4);
@@ -1594,25 +1587,23 @@
     taskTrackerManager.finalizeJob(u1j5, JobStatus.FAILED);
     //run initializer to clean up failed job
     p.selectJobsToInitialize();
+    // make sure we update our stats
+    scheduler.updateQSIInfo();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[7] , 
-        "Running Maps : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] , 
-        "Running Reduces : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 2 ");
+    assertEquals(infoStrings.length, 17);
+    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 2");
     
     //Raise status change events as none of the intialized jobs would be
     //in running queue as we just failed the second job which was initialized
     //and completed the first one.
-    
     raiseStatusChangeEvents(scheduler.jobQueuesManager);
     
     //Now schedule a map should be job3 of the user as job1 succeeded job2
     //failed and now job3 is running
-    
     t1 = checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
     FakeJobInProgress u1j3 = userJobs.get(2);
     assertTrue("User Job 3 not running ", 
@@ -1621,28 +1612,28 @@
     //now the running count of map should be one and waiting jobs should be
     //one. run the poller as it is responsible for waiting count
     p.selectJobsToInitialize();
+    // make sure we update our stats
+    scheduler.updateQSIInfo();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[7] , 
-        "Running Maps : 100.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] , 
-        "Running Reduces : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 1 ");
+    assertEquals(infoStrings.length, 19);
+    assertEquals(infoStrings[8], "Running tasks: 100.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[14],"Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[17] , "Number of Waiting Jobs: 1");
     
     //Fail the executing job
     taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
+    // make sure we update our stats
+    scheduler.updateQSIInfo();
     //Now running counts should become zero
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[7] , 
-        "Running Maps : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] , 
-        "Running Reduces : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 1 ");
+    assertEquals(infoStrings.length, 17);
+    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 1");
     
   }
 



Mime
View raw message