hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r781918 - in /hadoop/core/branches/branch-0.20: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/webapps/job/
Date Fri, 05 Jun 2009 05:59:20 GMT
Author: yhemanth
Date: Fri Jun  5 05:59:19 2009
New Revision: 781918

URL: http://svn.apache.org/viewvc?rev=781918&view=rev
Log:
HADOOP-5884. Fixes accounting in capacity scheduler so that high RAM jobs take more slots. Contributed by Vinod Kumar Vavilapalli.

Modified:
    hadoop/core/branches/branch-0.20/CHANGES.txt
    hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
    hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/core/branches/branch-0.20/src/webapps/job/jobdetails.jsp

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=781918&r1=781917&r2=781918&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Fri Jun  5 05:59:19 2009
@@ -122,6 +122,9 @@
     happens in MROutputThread after the last call to the map/reduce method, the 
     exception goes undetected. (Amar Kamat via ddas)
 
+    HADOOP-5884. Fixes accounting in capacity scheduler so that high RAM jobs
+    take more slots. (Vinod Kumar Vavilapalli via yhemanth)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=781918&r1=781917&r2=781918&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Fri Jun  5 05:59:19 2009
@@ -164,10 +164,10 @@
    * Sets the capacity of the given queue.
    * 
    * @param queue name of the queue
-   * @param gc percent of the cluster for the queue.
+   * @param capacity percent of the cluster for the queue.
    */
-  public void setCapacity(String queue,float gc) {
-    rmConf.setFloat(toFullPropertyName(queue, "capacity"),gc);
+  public void setCapacity(String queue,float capacity) {
+    rmConf.setFloat(toFullPropertyName(queue, "capacity"),capacity);
   }
   
   /**

Modified: hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=781918&r1=781917&r2=781918&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Jun  5 05:59:19 2009
@@ -24,8 +24,6 @@
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -34,8 +32,6 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
-import org.apache.hadoop.util.StringUtils;
-
 
 /**
  * A {@link TaskScheduler} that implements the requirements in HADOOP-3421
@@ -78,14 +74,20 @@
 
   private static class TaskSchedulingInfo {
     /** 
-     * the actual gc, which depends on how many slots are available
+     * the actual capacity, which depends on how many slots are available
      * in the cluster at any given time. 
      */
     int capacity = 0;
     // number of running tasks
     int numRunningTasks = 0;
-    /** for each user, we need to keep track of number of running tasks */
-    Map<String, Integer> numRunningTasksByUser = 
+    // number of slots occupied by running tasks
+    int numSlotsOccupied = 0;
+
+    /**
+     * for each user, we need to keep track of number of slots occupied by
+     * running tasks
+     */
+    Map<String, Integer> numSlotsOccupiedByUser = 
       new HashMap<String, Integer>();
     
     /**
@@ -93,32 +95,41 @@
      */
     void resetTaskVars() {
       numRunningTasks = 0;
-      for (String s: numRunningTasksByUser.keySet()) {
-        numRunningTasksByUser.put(s, 0);
+      numSlotsOccupied = 0;
+      for (String s: numSlotsOccupiedByUser.keySet()) {
+        numSlotsOccupiedByUser.put(s, Integer.valueOf(0));
       }
     }
 
     /**
      * return information about the tasks
      */
-    public String toString(){
-      float runningTasksAsPercent = capacity!= 0 ?
-          ((float)numRunningTasks * 100/capacity):0;
+    @Override
+    public String toString() {
+      float occupiedSlotsAsPercent =
+          capacity != 0 ? ((float) numSlotsOccupied * 100 / capacity) : 0;
       StringBuffer sb = new StringBuffer();
-      sb.append("Capacity: " + capacity + "\n");
-      sb.append(String.format("Running tasks: %.1f%% of Capacity\n",
-          runningTasksAsPercent));
+      sb.append("Capacity: " + capacity + " slots\n");
+      sb.append(String.format("Used capacity: %d (%.1f%% of Capacity)\n",
+          Integer.valueOf(numSlotsOccupied), Float
+              .valueOf(occupiedSlotsAsPercent)));
+      sb.append(String.format("Running tasks: %d\n", Integer
+          .valueOf(numRunningTasks)));
       // include info on active users
-      if (numRunningTasks != 0) {
+      if (numSlotsOccupied != 0) {
         sb.append("Active users:\n");
-        for (Map.Entry<String, Integer> entry: numRunningTasksByUser.entrySet()) {
+        for (Map.Entry<String, Integer> entry : numSlotsOccupiedByUser
+            .entrySet()) {
           if ((entry.getValue() == null) || (entry.getValue().intValue() <= 0)) {
             // user has no tasks running
             continue;
           }
-          sb.append("User '" + entry.getKey()+ "': ");
-          float p = (float)entry.getValue().intValue()*100/numRunningTasks;
-          sb.append(String.format("%.1f%% of running tasks\n", p));
+          sb.append("User '" + entry.getKey() + "': ");
+          int numSlotsOccupiedByThisUser = entry.getValue().intValue();
+          float p =
+              (float) numSlotsOccupiedByThisUser * 100 / numSlotsOccupied;
+          sb.append(String.format("%d (%.1f%% of used capacity)\n", Long
+              .valueOf(numSlotsOccupiedByThisUser), Float.valueOf(p)));
         }
       }
       return sb.toString();
@@ -152,10 +163,10 @@
     TaskSchedulingInfo mapTSI;
     TaskSchedulingInfo reduceTSI;
     
-    public QueueSchedulingInfo(String queueName, float gcPercent, 
+    public QueueSchedulingInfo(String queueName, float capacityPercent, 
         int ulMin, JobQueuesManager jobQueuesManager) {
       this.queueName = new String(queueName);
-      this.capacityPercent = gcPercent;
+      this.capacityPercent = capacityPercent;
       this.ulMin = ulMin;
       this.jobQueuesManager = jobQueuesManager;
       this.mapTSI = new TaskSchedulingInfo();
@@ -164,13 +175,14 @@
     
     /**
      * return information about the queue
+     * @return a String representing the information about the queue.
      */
+    @Override
     public String toString(){
       // We print out the queue information first, followed by info
       // on map and reduce tasks and job info
       StringBuffer sb = new StringBuffer();
       sb.append("Queue configuration\n");
-      //sb.append("Name: " + queueName + "\n");
       sb.append("Capacity Percentage: ");
       sb.append(capacityPercent);
       sb.append("%\n");
@@ -281,7 +293,15 @@
     protected CapacityTaskScheduler.TYPE type = null;
 
     abstract Task obtainNewTask(TaskTrackerStatus taskTracker, 
-        JobInProgress job) throws IOException; 
+        JobInProgress job) throws IOException;
+
+    int getSlotsOccupied(JobInProgress job) {
+      return getRunningTasks(job) * getSlotsPerTask(job);
+    }
+
+    abstract int getClusterCapacity();
+    abstract int getSlotsPerTask(JobInProgress job);
+    abstract int getRunningTasks(JobInProgress job);
     abstract int getPendingTasks(JobInProgress job);
     abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
     /**
@@ -313,12 +333,12 @@
       public int compare(QueueSchedulingInfo q1, QueueSchedulingInfo q2) {
         TaskSchedulingInfo t1 = getTSI(q1);
         TaskSchedulingInfo t2 = getTSI(q2);
-        // look at how much capacity they've filled. Treat a queue with gc=0 
-        // equivalent to a queue running at capacity
+        // look at how much capacity they've filled. Treat a queue with
+        // capacity=0 equivalent to a queue running at capacity
         double r1 = (0 == t1.capacity)? 1.0f:
-          (double)t1.numRunningTasks/(double)t1.capacity;
+          (double)t1.numSlotsOccupied/(double)t1.capacity;
         double r2 = (0 == t2.capacity)? 1.0f:
-          (double)t2.numRunningTasks/(double)t2.capacity;
+          (double)t2.numSlotsOccupied/(double)t2.capacity;
         if (r1<r2) return -1;
         else if (r1>r2) return 1;
         else return 0;
@@ -340,7 +360,17 @@
     protected final static ReduceQueueComparator reduceComparator = new ReduceQueueComparator();
     // and this is the comparator to use
     protected QueueComparator queueComparator;
-   
+
+    // Returns queues sorted according to the QueueComparator.
+    // Mainly for testing purposes.
+    String[] getOrderedQueues() {
+      List<String> queues = new ArrayList<String>(qsiForAssigningTasks.size());
+      for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
+        queues.add(qsi.queueName);
+      }
+      return queues.toArray(new String[queues.size()]);
+    }
+
     TaskSchedulingMgr(CapacityTaskScheduler sched) {
       scheduler = sched;
     }
@@ -357,24 +387,26 @@
     }
 
 
-    private boolean isUserOverLimit(String user, QueueSchedulingInfo qsi) {
-      // what is our current capacity? It's capacity if we're running below capacity.
-      // If we're running over capacity, then its #running plus 1 (which is the
-      // extra slot we're getting). 
+    private boolean isUserOverLimit(JobInProgress j, QueueSchedulingInfo qsi) {
+      // what is our current capacity? It is equal to the queue-capacity if
+      // we're running below capacity. If we're running over capacity, then its
+      // #running plus slotPerTask of the job (which is the number of extra
+      // slots we're getting).
       int currentCapacity;
       TaskSchedulingInfo tsi = getTSI(qsi);
-      if (tsi.numRunningTasks < tsi.capacity) {
+      if (tsi.numSlotsOccupied < tsi.capacity) {
         currentCapacity = tsi.capacity;
       }
       else {
-        currentCapacity = tsi.numRunningTasks+1;
+        currentCapacity = tsi.numSlotsOccupied + getSlotsPerTask(j);
       }
       int limit = Math.max((int)(Math.ceil((double)currentCapacity/
           (double)qsi.numJobsByUser.size())), 
           (int)(Math.ceil((double)(qsi.ulMin*currentCapacity)/100.0)));
-      if (tsi.numRunningTasksByUser.get(user) >= limit) {
-        LOG.debug("User " + user + " is over limit, num running tasks = " + 
-            tsi.numRunningTasksByUser.get(user) + ", limit = " + limit);
+      String user = j.getProfile().getUser();
+      if (tsi.numSlotsOccupiedByUser.get(user) >= limit) {
+        LOG.debug("User " + user + " is over limit, num slots occupied = " + 
+            tsi.numSlotsOccupiedByUser.get(user) + ", limit = " + limit);
         return true;
       }
       else {
@@ -403,7 +435,7 @@
           continue;
         }
         // check if the job's user is over limit
-        if (isUserOverLimit(j.getProfile().getUser(), qsi)) {
+        if (isUserOverLimit(j, qsi)) {
           continue;
         } 
         //If this job meets memory requirements. Ask the JobInProgress for
@@ -489,8 +521,11 @@
     // The caller is responsible for ensuring that the QSI objects and the 
     // collections are up-to-date.
     private TaskLookupResult assignTasks(TaskTrackerStatus taskTracker) throws IOException {
+
+      printQSIs();
+
       for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
-        // we may have queues with gc=0. We shouldn't look at jobs from 
+        // we may have queues with capacity=0. We shouldn't look at jobs from 
         // these queues
         if (0 == getTSI(qsi).capacity) {
           continue;
@@ -516,20 +551,23 @@
       // nothing to give
       return TaskLookupResult.getNoTaskFoundResult();
     }
-    
+
     // for debugging.
     private void printQSIs() {
-      StringBuffer s = new StringBuffer();
-      for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
-        TaskSchedulingInfo tsi = getTSI(qsi);
-        Collection<JobInProgress> runJobs = 
-          scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
-        s.append(" Queue '" + qsi.queueName + "'(" + this.type + "): run=" + 
-            tsi.numRunningTasks + ", gc=" + tsi.capacity
-            + ", run jobs="+ runJobs.size() +
-            "*** ");
+      if (LOG.isDebugEnabled()) {
+        StringBuffer s = new StringBuffer();
+        for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
+          TaskSchedulingInfo tsi = getTSI(qsi);
+          Collection<JobInProgress> runJobs =
+              scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
+          s.append(String.format(" Queue '%s'(%s): runningTasks=%d, "
+              + "occupiedSlots=%d, capacity=%d, runJobs=%d", qsi.queueName,
+              this.type, Integer.valueOf(tsi.numRunningTasks), Integer
+                  .valueOf(tsi.numSlotsOccupied), Integer
+                  .valueOf(tsi.capacity), Integer.valueOf(runJobs.size())));
+        }
+        LOG.debug(s);
       }
-      LOG.debug(s);
     }
     
     /**
@@ -559,11 +597,14 @@
    * The scheduling algorithms for map tasks. 
    */
   private static class MapSchedulingMgr extends TaskSchedulingMgr {
-    MapSchedulingMgr(CapacityTaskScheduler dad) {
-      super(dad);
+
+    MapSchedulingMgr(CapacityTaskScheduler schedulr) {
+      super(schedulr);
       type = CapacityTaskScheduler.TYPE.MAP;
       queueComparator = mapComparator;
     }
+
+    @Override
     Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
     throws IOException {
       ClusterStatus clusterStatus = 
@@ -572,16 +613,30 @@
       return job.obtainNewMapTask(taskTracker, numTaskTrackers, 
           scheduler.taskTrackerManager.getNumberOfUniqueHosts());
     }
+
+    @Override
     int getClusterCapacity() {
       return scheduler.taskTrackerManager.getClusterStatus().getMaxMapTasks();
     }
+
+    @Override
     int getRunningTasks(JobInProgress job) {
       return job.runningMaps();
     }
+
+    @Override
     int getPendingTasks(JobInProgress job) {
       return job.pendingMaps();
     }
 
+    @Override
+    int getSlotsPerTask(JobInProgress job) {
+      long myVmem = job.getJobConf().getMemoryForMapTask();
+      return (int) (Math.ceil((float) myVmem
+          / (float) scheduler.getMemSizeForMapSlot()));
+    }
+
+    @Override
     TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
       return qsi.mapTSI;
     }
@@ -601,11 +656,14 @@
    * The scheduling algorithms for reduce tasks. 
    */
   private static class ReduceSchedulingMgr extends TaskSchedulingMgr {
-    ReduceSchedulingMgr(CapacityTaskScheduler dad) {
-      super(dad);
+
+    ReduceSchedulingMgr(CapacityTaskScheduler schedulr) {
+      super(schedulr);
       type = CapacityTaskScheduler.TYPE.REDUCE;
       queueComparator = reduceComparator;
     }
+
+    @Override
     Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
     throws IOException {
       ClusterStatus clusterStatus = 
@@ -614,16 +672,31 @@
       return job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
           scheduler.taskTrackerManager.getNumberOfUniqueHosts());
     }
+
+    @Override
     int getClusterCapacity() {
-      return scheduler.taskTrackerManager.getClusterStatus().getMaxReduceTasks();
+      return scheduler.taskTrackerManager.getClusterStatus()
+          .getMaxReduceTasks();
     }
+
+    @Override
     int getRunningTasks(JobInProgress job) {
       return job.runningReduces();
     }
+
+    @Override
     int getPendingTasks(JobInProgress job) {
       return job.pendingReduces();
     }
 
+    @Override
+    int getSlotsPerTask(JobInProgress job) {
+      long myVmem = job.getJobConf().getMemoryForReduceTask();
+      return (int) (Math.ceil((float) myVmem
+          / (float) scheduler.getMemSizeForReduceSlot()));
+    }
+
+    @Override
     TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
       return qsi.reduceTSI;
     }
@@ -655,7 +728,10 @@
   protected CapacitySchedulerConf schedConf;
   /** whether scheduler has started or not */
   private boolean started = false;
-  
+
+  static String JOB_SCHEDULING_INFO_FORMAT_STRING =
+      "%s running map tasks using %d map slots,"
+          + " %s running reduce tasks using %d reduce slots.";
   /**
    * A clock class - can be mocked out for testing.
    */
@@ -664,6 +740,7 @@
       return System.currentTimeMillis();
     }
   }
+
   // can be replaced with a global type, if we have one
   protected static enum TYPE {
     MAP, REDUCE
@@ -709,12 +786,13 @@
         JobConf.normalizeMemoryConfigValue(conf.getLong(
             JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
             JobConf.DISABLED_MEMORY_LIMIT));
-    LOG.info(new StringBuilder().append("Scheduler configured with ").append(
-        "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT,").append(
-        " limitMaxMemForMapTasks, limitMaxMemForReduceTasks)").append(
-        memSizeForMapSlotOnJT).append(", ").append(memSizeForReduceSlotOnJT)
-        .append(", ").append(limitMaxMemForMapTasks).append(", ").append(
-            limitMaxMemForReduceTasks).append(")"));
+    LOG.info(String.format("Scheduler configured with "
+        + "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT, "
+        + "limitMaxMemForMapTasks, limitMaxMemForReduceTasks)"
+        + " (%d,%d,%d,%d)", Long.valueOf(memSizeForMapSlotOnJT), Long
+        .valueOf(memSizeForReduceSlotOnJT), Long
+        .valueOf(limitMaxMemForMapTasks), Long
+        .valueOf(limitMaxMemForReduceTasks)));
   }
 
   long getMemSizeForMapSlot() {
@@ -733,6 +811,15 @@
     return limitMaxMemForReduceTasks;
   }
 
+  String[] getOrderedQueues(CapacityTaskScheduler.TYPE type) {
+    if (type.equals(CapacityTaskScheduler.TYPE.MAP)) {
+      return mapScheduler.getOrderedQueues();
+    } else if (type.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+      return reduceScheduler.getOrderedQueues();
+    }
+    return null;
+  }
+
   @Override
   public synchronized void start() throws IOException {
     if (started) return;
@@ -755,15 +842,15 @@
     Set<String> queuesWithoutConfiguredCapacity = new HashSet<String>();
     float totalCapacity = 0.0f;
     for (String queueName: queues) {
-      float gc = schedConf.getCapacity(queueName);
-      if(gc == -1.0) {
+      float capacity = schedConf.getCapacity(queueName);
+      if(capacity == -1.0) {
         queuesWithoutConfiguredCapacity.add(queueName);
       }else {
-        totalCapacity += gc;
+        totalCapacity += capacity;
       }
       int ulMin = schedConf.getMinimumUserLimitPercent(queueName); 
       // create our QSI and add to our hashmap
-      QueueSchedulingInfo qsi = new QueueSchedulingInfo(queueName, gc, 
+      QueueSchedulingInfo qsi = new QueueSchedulingInfo(queueName, capacity, 
                                                     ulMin, jobQueuesManager);
       queueInfoMap.put(queueName, qsi);
 
@@ -877,29 +964,49 @@
         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
           continue;
         }
-        int runningMaps = j.runningMaps();
-        int runningReduces = j.runningReduces();
-        qsi.mapTSI.numRunningTasks += runningMaps;
-        qsi.reduceTSI.numRunningTasks += runningReduces;
+
+        int numMapsRunningForThisJob = mapScheduler.getRunningTasks(j);
+        int numReducesRunningForThisJob = reduceScheduler.getRunningTasks(j);
+        int numMapSlotsForThisJob = mapScheduler.getSlotsOccupied(j);
+        int numReduceSlotsForThisJob = reduceScheduler.getSlotsOccupied(j);
+        j.setSchedulingInfo(String.format(JOB_SCHEDULING_INFO_FORMAT_STRING,
+            Integer.valueOf(numMapsRunningForThisJob), Integer
+                .valueOf(numMapSlotsForThisJob), Integer
+                .valueOf(numReducesRunningForThisJob), Integer
+                .valueOf(numReduceSlotsForThisJob)));
+        qsi.mapTSI.numRunningTasks += numMapsRunningForThisJob;
+        qsi.reduceTSI.numRunningTasks += numReducesRunningForThisJob;
+        qsi.mapTSI.numSlotsOccupied += numMapSlotsForThisJob;
+        qsi.reduceTSI.numSlotsOccupied += numReduceSlotsForThisJob;
         Integer i =
-          qsi.mapTSI.numRunningTasksByUser.get(j.getProfile().getUser());
-        qsi.mapTSI.numRunningTasksByUser.put(j.getProfile().getUser(),
-            i+runningMaps);
-        i = qsi.reduceTSI.numRunningTasksByUser.get(j.getProfile().getUser());
-        qsi.reduceTSI.numRunningTasksByUser.put(j.getProfile().getUser(),
-            i+runningReduces);
-        LOG.debug("updateQSI: job " + j.getJobID().toString() + ": run(m) = " +
-            j.runningMaps() + ", run(r) = " + j.runningReduces() +
-            ", finished(m) = " + j.finishedMaps() + ", finished(r)= " +
-            j.finishedReduces() + ", failed(m) = " + j.failedMapTasks +
-            ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " +
-            j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks
-            + ", total(m) = " + j.numMapTasks + ", total(r) = " +
-            j.numReduceTasks);
+            qsi.mapTSI.numSlotsOccupiedByUser.get(j.getProfile().getUser());
+        qsi.mapTSI.numSlotsOccupiedByUser.put(j.getProfile().getUser(),
+            Integer.valueOf(i.intValue() + numMapSlotsForThisJob));
+        i = qsi.reduceTSI.numSlotsOccupiedByUser.get(j.getProfile().getUser());
+        qsi.reduceTSI.numSlotsOccupiedByUser.put(j.getProfile().getUser(),
+            Integer.valueOf(i.intValue() + numReduceSlotsForThisJob));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("updateQSI: job %s: run(m)=%d, "
+              + "occupied(m)=%d, run(r)=%d, occupied(r)=%d, finished(m)=%d,"
+              + " finished(r)=%d, failed(m)=%d, failed(r)=%d, "
+              + "spec(m)=%d, spec(r)=%d, total(m)=%d, total(r)=%d", j
+              .getJobID().toString(), Integer
+              .valueOf(numMapsRunningForThisJob), Integer
+              .valueOf(numMapSlotsForThisJob), Integer
+              .valueOf(numReducesRunningForThisJob), Integer
+              .valueOf(numReduceSlotsForThisJob), Integer.valueOf(j
+              .finishedMaps()), Integer.valueOf(j.finishedReduces()), Integer
+              .valueOf(j.failedMapTasks),
+              Integer.valueOf(j.failedReduceTasks), Integer
+                  .valueOf(j.speculativeMapTasks), Integer
+                  .valueOf(j.speculativeReduceTasks), Integer
+                  .valueOf(j.numMapTasks), Integer.valueOf(j.numReduceTasks)));
+        }
+
         /*
          * it's fine walking down the entire list of running jobs - there
          * probably will not be many, plus, we may need to go through the
-         * list to compute numRunningTasksByUser. If this is expensive, we
+         * list to compute numSlotsOccupiedByUser. If this is expensive, we
          * can keep a list of running jobs per user. Then we only need to
          * consider the first few jobs per user.
          */
@@ -914,10 +1021,8 @@
    * The grand plan for assigning a task. 
    * First, decide whether a Map or Reduce task should be given to a TT 
    * (if the TT can accept either). 
-   * Next, pick a queue. We only look at queues that need a slot. Among
-   * these, we first look at queues whose ac is less than gc (queues that 
-   * gave up capacity in the past). Next, we look at any other queue that
-   * needs a slot. 
+   * Next, pick a queue. We only look at queues that need a slot. Among these,
+   * we first look at queues whose (# of running tasks)/capacity is the least.
    * Next, pick a job in a queue. we pick the job at the front of the queue
    * unless its user is over the user limit. 
    * Finally, given a job, pick a task from the job. 
@@ -1019,8 +1124,10 @@
     if (null == i) {
       i = 1;
       // set the count for running tasks to 0
-      qsi.mapTSI.numRunningTasksByUser.put(job.getProfile().getUser(), 0);
-      qsi.reduceTSI.numRunningTasksByUser.put(job.getProfile().getUser(), 0);
+      qsi.mapTSI.numSlotsOccupiedByUser.put(job.getProfile().getUser(),
+          Integer.valueOf(0));
+      qsi.reduceTSI.numSlotsOccupiedByUser.put(job.getProfile().getUser(),
+          Integer.valueOf(0));
     }
     else {
       i++;
@@ -1042,8 +1149,8 @@
     if (0 == i.intValue()) {
       qsi.numJobsByUser.remove(job.getProfile().getUser());
       // remove job footprint from our TSIs
-      qsi.mapTSI.numRunningTasksByUser.remove(job.getProfile().getUser());
-      qsi.reduceTSI.numRunningTasksByUser.remove(job.getProfile().getUser());
+      qsi.mapTSI.numSlotsOccupiedByUser.remove(job.getProfile().getUser());
+      qsi.reduceTSI.numSlotsOccupiedByUser.remove(job.getProfile().getUser());
       LOG.debug("No more jobs for user, number of users = " + qsi.numJobsByUser.size());
     }
     else {

Modified: hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=781918&r1=781917&r2=781918&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Jun  5 05:59:19 2009
@@ -39,8 +39,6 @@
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 
-
-
 public class TestCapacityScheduler extends TestCase {
 
   static final Log LOG =
@@ -605,13 +603,13 @@
   // represents a fake queue configuration info
   static class FakeQueueInfo {
     String queueName;
-    float gc;
+    float capacity;
     boolean supportsPrio;
     int ulMin;
 
-    public FakeQueueInfo(String queueName, float gc, boolean supportsPrio, int ulMin) {
+    public FakeQueueInfo(String queueName, float capacity, boolean supportsPrio, int ulMin) {
       this.queueName = queueName;
-      this.gc = gc;
+      this.capacity = capacity;
       this.supportsPrio = supportsPrio;
       this.ulMin = ulMin;
     }
@@ -641,10 +639,10 @@
     }*/
     
     public float getCapacity(String queue) {
-      if(queueMap.get(queue).gc == -1) {
+      if(queueMap.get(queue).capacity == -1) {
         return super.getCapacity(queue);
       }
-      return queueMap.get(queue).gc;
+      return queueMap.get(queue).capacity;
     }
     
     public int getMinimumUserLimitPercent(String queue) {
@@ -899,13 +897,6 @@
     return queue.toArray(new JobInProgress[0]);
   }
   
-  /*protected void submitJobs(int number, int state, int maps, int reduces)
-    throws IOException {
-    for (int i = 0; i < number; i++) {
-      submitJob(state, maps, reduces);
-    }
-  }*/
-  
   // tests if tasks can be assinged when there are multiple jobs from a same
   // user
   public void testJobFinished() throws Exception {
@@ -1042,7 +1033,7 @@
     String[] qs = {"default", "q2"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    // set the gc % as 10%, so that gc will be zero initially as 
+    // set the capacity % as 10%, so that capacity will be zero initially as 
     // the cluster capacity increase slowly.
     queues.add(new FakeQueueInfo("default", 10.0f, true, 25));
     queues.add(new FakeQueueInfo("q2", 90.0f, true, 25));
@@ -1076,7 +1067,7 @@
     // add another tt to increase tt slots
     taskTrackerManager.addTaskTracker("tt5");
     // now job from default should run, as it is furthest away
-    // in terms of runningMaps / gc.
+    // in terms of runningMaps / capacity.
     checkAssignment("tt4", "attempt_test_0001_m_000001_0 on tt4");
     verifyCapacity("1", "default");
     verifyCapacity("9", "q2");
@@ -1087,7 +1078,7 @@
     String schedInfo = taskTrackerManager.getQueueManager().
                           getSchedulerInfo(queue).toString();    
     assertTrue(schedInfo.contains("Map tasks\nCapacity: " 
-        + expectedCapacity));
+        + expectedCapacity + " slots"));
   }
   
   // test capacity transfer
@@ -1268,7 +1259,82 @@
     // first in the queue
     checkAssignment("tt4", "attempt_test_0001_m_000007_0 on tt4");
   }
-  
+
+  /**
+   * Test to verify that high memory jobs hit user limits faster than any normal
+   * job.
+   * 
+   * @throws IOException
+   */
+  public void testUserLimitsForHighMemoryJobs()
+      throws IOException {
+    taskTrackerManager = new FakeTaskTrackerManager(1, 10, 10);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    String[] qs = { "default" };
+    taskTrackerManager.addQueues(qs);
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
+    resConf.setFakeQueues(queues);
+    // enabled memory-based scheduling
+    // Normal job in the cluster would be 1GB maps/reduces
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    // Submit one normal job to the other queue.
+    JobConf jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
+    jConf.setNumMapTasks(6);
+    jConf.setNumReduceTasks(6);
+    jConf.setUser("u1");
+    jConf.setQueueName("default");
+    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    LOG.debug("Submit one high memory(2GB maps, 2GB reduces) job of "
+        + "6 map and 6 reduce tasks");
+    jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(2 * 1024);
+    jConf.setMemoryForReduceTask(2 * 1024);
+    jConf.setNumMapTasks(6);
+    jConf.setNumReduceTasks(6);
+    jConf.setQueueName("default");
+    jConf.setUser("u2");
+    FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    // Verify that normal job takes 3 task assignments to hit user limits
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000004_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000005_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000005_0 on tt1");
+    // u1 has 5 map slots and 5 reduce slots. u2 has none. So u1's user limits
+    // are hit. So u2 should get slots
+
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
+
+    // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
+    // slots. Because of high memory tasks, giving u2 another task would
+    // overflow limits. So, no more tasks should be given to anyone.
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+  }
+
   /*
    * Following is the testing strategy for testing scheduling information.
    * - start capacity scheduler with two queues.
@@ -1318,21 +1384,35 @@
     scheduler.assignTasks(tracker("tt1")); // heartbeat
     scheduler.assignTasks(tracker("tt2")); // heartbeat
     int totalMaps = taskTrackerManager.getClusterStatus().getMaxMapTasks();
-    int totalReduces = taskTrackerManager.getClusterStatus().getMaxReduceTasks();
+    int totalReduces =
+        taskTrackerManager.getClusterStatus().getMaxReduceTasks();
     QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
-    String schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo();
-    String schedulingInfo2 = queueManager.getJobQueueInfo("q2").getSchedulingInfo();
+    String schedulingInfo =
+        queueManager.getJobQueueInfo("default").getSchedulingInfo();
+    String schedulingInfo2 =
+        queueManager.getJobQueueInfo("q2").getSchedulingInfo();
     String[] infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 16);
-    assertEquals(infoStrings[1] , "Capacity Percentage: 50.0%");
-    assertEquals(infoStrings[6] , "Capacity: " + totalMaps * 50/100);
-    assertEquals(infoStrings[10] , "Capacity: " + totalReduces * 50/100);
-    assertEquals(infoStrings[2] , "User Limit: 25%");
-    assertEquals(infoStrings[3] , "Priority Supported: YES");
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 0");
-    assertEquals(infoStrings[15] , "Number of users who have submitted jobs: 0");
+    assertEquals(infoStrings.length, 18);
+    assertEquals(infoStrings[0], "Queue configuration");
+    assertEquals(infoStrings[1], "Capacity Percentage: 50.0%");
+    assertEquals(infoStrings[2], "User Limit: 25%");
+    assertEquals(infoStrings[3], "Priority Supported: YES");
+    assertEquals(infoStrings[4], "-------------");
+    assertEquals(infoStrings[5], "Map tasks");
+    assertEquals(infoStrings[6], "Capacity: " + totalMaps * 50 / 100
+        + " slots");
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[9], "-------------");
+    assertEquals(infoStrings[10], "Reduce tasks");
+    assertEquals(infoStrings[11], "Capacity: " + totalReduces * 50 / 100
+        + " slots");
+    assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[13], "Running tasks: 0");
+    assertEquals(infoStrings[14], "-------------");
+    assertEquals(infoStrings[15], "Job info");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 0");
+    assertEquals(infoStrings[17], "Number of users who have submitted jobs: 0");
     assertEquals(schedulingInfo, schedulingInfo2);
 
     //Testing with actual job submission.
@@ -1343,10 +1423,13 @@
     infoStrings = schedulingInfo.split("\n");
 
     //waiting job should be equal to number of jobs submitted.
-    assertEquals(infoStrings.length, 16);
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 5");
+    assertEquals(infoStrings.length, 18);
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[13], "Running tasks: 0");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 5");
+    assertEquals(infoStrings[17], "Number of users who have submitted jobs: 1");
 
     //Initalize the jobs but don't raise events
     controlledInitializationPoller.selectJobsToInitialize();
@@ -1354,12 +1437,14 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 16);
+    assertEquals(infoStrings.length, 18);
     //should be previous value as nothing is scheduled because no events
     //has been raised after initialization.
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 5");
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[13], "Running tasks: 0");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 5");
 
     //Raise status change event so that jobs can move to running queue.
     raiseStatusChangeEvents(scheduler.jobQueuesManager);
@@ -1376,10 +1461,14 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 18);
-    assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
-    assertEquals(infoStrings[13],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[16] , "Number of Waiting Jobs: 4");
+    assertEquals(infoStrings.length, 20);
+    assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 1");
+    assertEquals(infoStrings[9], "Active users:");
+    assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
+    assertEquals(infoStrings[14], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[15], "Running tasks: 0");
+    assertEquals(infoStrings[18], "Number of Waiting Jobs: 4");
 
     //assign a reduce task
     Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
@@ -1388,10 +1477,16 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 20);
-    assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
-    assertEquals(infoStrings[13],"Running tasks: 100.0% of Capacity");
-    assertEquals(infoStrings[18] , "Number of Waiting Jobs: 4");
+    assertEquals(infoStrings.length, 22);
+    assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 1");
+    assertEquals(infoStrings[9], "Active users:");
+    assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
+    assertEquals(infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
+    assertEquals(infoStrings[15], "Running tasks: 1");
+    assertEquals(infoStrings[16], "Active users:");
+    assertEquals(infoStrings[17], "User 'u1': 1 (100.0% of used capacity)");
+    assertEquals(infoStrings[20], "Number of Waiting Jobs: 4");
 
     //Complete the job and check the running tasks count
     FakeJobInProgress u1j1 = userJobs.get(0);
@@ -1404,10 +1499,12 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 16);
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 4");
+    assertEquals(infoStrings.length, 18);
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[13], "Running tasks: 0");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 4");
 
     //Fail a job which is initialized but not scheduled and check the count.
     FakeJobInProgress u1j2 = userJobs.get(1);
@@ -1421,10 +1518,14 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 16);
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 3");
+    assertEquals(infoStrings.length, 18);
+    //should be previous value as nothing is scheduled because no events
+    //has been raised after initialization.
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[13], "Running tasks: 0");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 3");
 
     //Fail a job which is not initialized but is in the waiting queue.
     FakeJobInProgress u1j5 = userJobs.get(4);
@@ -1439,10 +1540,14 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 16);
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 2");
+    assertEquals(infoStrings.length, 18);
+    //should be previous value as nothing is scheduled because no events
+    //has been raised after initialization.
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[13], "Running tasks: 0");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 2");
 
     //Raise status change events as none of the intialized jobs would be
     //in running queue as we just failed the second job which was initialized
@@ -1465,10 +1570,12 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 18);
-    assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
-    assertEquals(infoStrings[13],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[16] , "Number of Waiting Jobs: 1");
+    assertEquals(infoStrings.length, 20);
+    assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 1");
+    assertEquals(infoStrings[9], "Active users:");
+    assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
+    assertEquals(infoStrings[18], "Number of Waiting Jobs: 1");
 
     //Fail the executing job
     taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
@@ -1478,11 +1585,10 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 16);
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 1");
-
+    assertEquals(infoStrings.length, 18);
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 1");
   }
 
   /**
@@ -1556,8 +1662,10 @@
     scheduler.start();
 
     // The situation : Two jobs in the queue. First job with only maps and no
-    // reduces and is a high memory job. Second job is a normal job with both maps and reduces.
-    // First job cannot run for want of memory for maps. In this case, second job's reduces should run.
+    // reduces and is a high memory job. Second job is a normal job with both
+    // maps and reduces.
+    // First job cannot run for want of memory for maps. In this case, second
+    // job's reduces should run.
     
     LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
         + "2 map tasks");
@@ -1583,11 +1691,16 @@
     
     // first, a map from j1 will run
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    // Total 2 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 100.0f);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
 
     // at this point, the scheduler tries to schedule another map from j1. 
     // there isn't enough space. The second job's reduce should be scheduled.
     checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    // Total 1 reduce slot should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+        100.0f);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
   }
 
@@ -1622,7 +1735,7 @@
     scheduler.start();
 
     LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
-        + "1 map, 0 reduce tasks.");
+        + "1 map, 1 reduce tasks.");
     JobConf jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(1 * 1024);
     jConf.setMemoryForReduceTask(1 * 1024);
@@ -1634,8 +1747,19 @@
 
     // Fill the second tt with this job.
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    // Total 1 map slot should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 25.0f);
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 0, 0),
+        (String) job1.getSchedulingInfo());
     checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    // Total 1 map slot should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+        25.0f);
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 1, 1),
+        (String) job1.getSchedulingInfo());
     checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
 
     LOG.debug("Submit one high memory(2GB maps/reduces) job of "
@@ -1650,8 +1774,20 @@
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
 
     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    // Total 3 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f);
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 0, 0),
+        (String) job2.getSchedulingInfo());
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
     checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    // Total 3 reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3,
+        75.0f);
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2),
+        (String) job2.getSchedulingInfo());
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
 
     LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
@@ -1670,8 +1806,17 @@
     assertNull(scheduler.assignTasks(tracker("tt2")));
     assertNull(scheduler.assignTasks(tracker("tt1")));
     assertNull(scheduler.assignTasks(tracker("tt2")));
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f);
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3,
+        75.0f);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
     checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2),
+        (String) job2.getSchedulingInfo());
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 0, 0),
+        (String) job3.getSchedulingInfo());
   }
 
   /**
@@ -1720,10 +1865,23 @@
 
     // 1st cycle - 1 map gets assigned.
     Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    // Total 1 map slot should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f);
     checkMemReservedForTasksOnTT("tt1",  512L, 0L);
+
+    // 1st cycle of reduces - 1 reduce gets assigned.
+    Task t1 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    // Total 1 reduce slot should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+        50.0f);
+    checkMemReservedForTasksOnTT("tt1",  512L, 512L);
     
     // kill this job !
     taskTrackerManager.killJob(job1.getJobID());
+    // No more map/reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 0, 0, 0.0f);
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 0, 0,
+        0.0f);
     
     // retire the job
     taskTrackerManager.removeJob(job1.getJobID());
@@ -1748,57 +1906,23 @@
     assertNull(scheduler.assignTasks(tracker("tt1")));
     checkMemReservedForTasksOnTT("tt1", null, null);
     
-    // finish the task on the tracker.
+    // finish the tasks on the tracker.
     taskTrackerManager.finishTask("tt1", t.getTaskID().toString(), job1);
+    taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), job1);
+
     // now a new task can be assigned.
     t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    // Total 1 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f);
     checkMemReservedForTasksOnTT("tt1", 512L, 0L);
 
     // reduce can be assigned.
     t = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    // Total 1 reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+        50.0f);
     checkMemReservedForTasksOnTT("tt1", 512L, 512L);
   }
-  
-  protected TaskTrackerStatus tracker(String taskTrackerName) {
-    return taskTrackerManager.getTaskTracker(taskTrackerName);
-  }
-  
-  protected Task checkAssignment(String taskTrackerName,
-      String expectedTaskString) throws IOException {
-    List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
-    assertNotNull(expectedTaskString, tasks);
-    assertEquals(expectedTaskString, 1, tasks.size());
-    assertEquals(expectedTaskString, tasks.get(0).toString());
-    return tasks.get(0);
-  }
-
-  /**
-   * Get the amount of memory that is reserved for tasks on the taskTracker and
-   * verify that it matches what is expected.
-   * 
-   * @param taskTracker
-   * @param expectedMemForMapsOnTT
-   * @param expectedMemForReducesOnTT
-   */
-  private void checkMemReservedForTasksOnTT(String taskTracker,
-      Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) {
-    Long observedMemForMapsOnTT =
-        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
-            CapacityTaskScheduler.TYPE.MAP);
-    Long observedMemForReducesOnTT =
-        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
-            CapacityTaskScheduler.TYPE.REDUCE);
-    if (expectedMemForMapsOnTT == null) {
-      assertTrue(observedMemForMapsOnTT == null);
-    } else {
-      assertTrue(observedMemForMapsOnTT.equals(expectedMemForMapsOnTT));
-    }
-    if (expectedMemForReducesOnTT == null) {
-      assertTrue(observedMemForReducesOnTT == null);
-    } else {
-      assertTrue(observedMemForReducesOnTT.equals(expectedMemForReducesOnTT));
-    }
-  }
 
   /*
    * Test cases for Job Initialization poller.
@@ -2006,10 +2130,9 @@
     checkFailedInitializedJobMovement();
 
     // Check failed waiting job movement
-    checkFailedWaitingJobMovement();
-    
+    checkFailedWaitingJobMovement(); 
   }
-  
+
   public void testStartWithoutDefaultQueueConfigured() throws Exception {
     //configure a single queue which is not default queue
     String[] qs = {"q1"};
@@ -2189,9 +2312,9 @@
     checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", fjob2);
     taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000001_0", fjob2);
-    taskTrackerManager.finalizeJob(fjob2);
-    
+    taskTrackerManager.finalizeJob(fjob2);    
   }
+
   /**
    * Test case to test scheduling of
    * <ol> 
@@ -2207,17 +2330,16 @@
    * <ul>
    * <li>Submit one high ram job which has speculative reduce.</li>
    * <li>Submit a normal job which has no speculative reduce.</li>
-   * <li>Scheduler should schedule first all reduce tasks from first job and block
-   * the cluster till both reduces are completed.</li>
+   * <li>Scheduler should schedule first all reduce tasks from first job and
+   * block the cluster till both reduces are completed.</li>
    * </ul>
    * </li>
    * </ol>
    * @throws IOException
    */
   public void testHighRamJobWithSpeculativeExecution() throws IOException {
-    // 2 map and 2 reduce slots
+    // 2 TTs, 3 map and 3 reduce slots on each TT
     taskTrackerManager = new FakeTaskTrackerManager(2, 3, 3);
-    // 1GB for each map, 1GB for each reduce
 
     taskTrackerManager.addQueues(new String[] { "default" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -2225,6 +2347,7 @@
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
+    // 1GB for each map, 1GB for each reduce
     scheduler.getConf().setLong(
         JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
         3 * 1024L);
@@ -2248,8 +2371,9 @@
     jConf.setUser("u1");
     jConf.setMapSpeculativeExecution(true);
     jConf.setReduceSpeculativeExecution(false);
-    FakeJobInProgress job1 = new FakeJobInProgress(new JobID("test", ++jobCounter),
-          jConf, taskTrackerManager,"u1");
+    FakeJobInProgress job1 =
+        new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
+            taskTrackerManager, "u1");
     taskTrackerManager.submitJob(job1);
 
     //Submit normal job
@@ -2273,21 +2397,29 @@
     //complete none of the tasks from other jobs would be scheduled.
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
-
     assertEquals("pending maps greater than zero " , job1.pendingMaps(), 0);
+    // Total 2 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
+
     //make same tracker get back, check if you are blocking. Your job
     //has speculative map task so tracker should be blocked even tho' it
     //can run job2's map.
     assertNull(scheduler.assignTasks(tracker("tt1")));
+    // Total 2 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
 
     //TT2 now gets speculative map of the job1
     checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
+    // Total 4 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 4, 66.7f);
     checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 0L);
 
     // Now since the first job has no more speculative maps, it can schedule
     // the second job.
     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    // Total 5 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 5, 83.3f);
     checkMemReservedForTasksOnTT("tt1", 3 * 1024L, 0L);
 
     //finish everything
@@ -2310,14 +2442,15 @@
     jConf.setUser("u1");
     jConf.setMapSpeculativeExecution(false);
     jConf.setReduceSpeculativeExecution(true);
-    FakeJobInProgress job3 = new FakeJobInProgress(new JobID("test", ++jobCounter),
-          jConf, taskTrackerManager,"u1");
+    FakeJobInProgress job3 =
+        new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
+            taskTrackerManager, "u1");
     taskTrackerManager.submitJob(job3);
 
     //Submit normal job w.r.t reduces
     jConf = new JobConf();
-    jConf.setMemoryForMapTask(2 * 1024L);
-    jConf.setMemoryForReduceTask(1 * 104L);
+    jConf.setMemoryForMapTask(1 * 1024L);
+    jConf.setMemoryForReduceTask(1 * 1024L);
     jConf.setNumMapTasks(1);
     jConf.setNumReduceTasks(1);
     jConf.setQueueName("default");
@@ -2331,32 +2464,167 @@
 
     // Finish up the map scheduler
     checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
+    // Total 2 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
     checkAssignment("tt2", "attempt_test_0004_m_000001_0 on tt2");
-    checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 0L);
+    // Total 3 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 50.0f);
+    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
 
     // first, a reduce from j3 will run
     // at this point, there is a speculative task for the same job to be
     //scheduled. This task would be scheduled. Till the tasks from job3 gets
     //complete none of the tasks from other jobs would be scheduled.
     checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
-    assertEquals("pending reduces greater than zero " , job3.pendingMaps(), 0);
+    assertEquals("pending reduces greater than zero ", job3.pendingReduces(),
+        0);
+    // Total 2 reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2,
+        33.3f);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2*1024L);
 
     //make same tracker get back, check if you are blocking. Your job
     //has speculative reduce task so tracker should be blocked even tho' it
     //can run job4's reduce.
     assertNull(scheduler.assignTasks(tracker("tt1")));
-    //TT2 now gets speculative map of the job1
+    // Total 2 reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2,
+        33.3f);
+
+    //TT2 now gets speculative reduce of the job3
     checkAssignment("tt2", "attempt_test_0003_r_000001_1 on tt2");
-    checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 2 * 1024L);
+    // Total 4 reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 4,
+        66.7f);
+    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 2 * 1024L);
 
     // Now since j3 has no more speculative reduces, it can schedule
     // the j4.
     checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
+    // Total 5 reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 5,
+        83.3f);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 3 * 1024L);
   }
 
+  /**
+   * Test to verify that queue ordering is based on the number of slots occupied
+   * and hence to verify that presence of high memory jobs is reflected properly
+   * while determining used capacities of queues and hence the queue ordering.
+   * 
+   * @throws IOException
+   */
+  public void testQueueOrdering()
+      throws IOException {
+    taskTrackerManager = new FakeTaskTrackerManager(2, 6, 6);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    String[] qs = { "default", "q1" };
+    String[] reversedQs = { qs[1], qs[0] };
+    taskTrackerManager.addQueues(qs);
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 50.0f, true, 100));
+    queues.add(new FakeQueueInfo("q1", 50.0f, true, 100));
+    resConf.setFakeQueues(queues);
+    // enabled memory-based scheduling
+    // Normal job in the cluster would be 1GB maps/reduces
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    LOG.debug("Submit one high memory(2GB maps, 2GB reduces) job of "
+        + "6 map and 6 reduce tasks");
+    JobConf jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(2 * 1024);
+    jConf.setMemoryForReduceTask(2 * 1024);
+    jConf.setNumMapTasks(6);
+    jConf.setNumReduceTasks(6);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    // Submit a normal job to the other queue.
+    jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
+    jConf.setNumMapTasks(6);
+    jConf.setNumReduceTasks(6);
+    jConf.setUser("u1");
+    jConf.setQueueName("q1");
+    FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    // Map 1 of high memory job
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkQueuesOrder(qs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+    // Reduce 1 of high memory job
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    checkQueuesOrder(qs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+    // Map 1 of normal job
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+    // Reduce 1 of normal job
+    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+    // Map 2 of normal job
+    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+    // Reduce 2 of normal job
+    checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+    // Now both the queues are equally served. But the comparator doesn't change
+    // the order if queues are equally served.
+
+    // Map 3 of normal job
+    checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+    // Reduce 3 of normal job
+    checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+    // Map 2 of high memory job
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkQueuesOrder(qs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+    // Reduce 2 of high memory job
+    checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkQueuesOrder(qs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+    // Map 4 of normal job
+    checkAssignment("tt2", "attempt_test_0002_m_000004_0 on tt2");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+    // Reduce 4 of normal job
+    checkAssignment("tt2", "attempt_test_0002_r_000004_0 on tt2");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+  }
+
   private void checkFailedInitializedJobMovement() throws IOException {
     
     JobQueuesManager mgr = scheduler.jobQueuesManager;
@@ -2436,6 +2704,89 @@
       userJobs.put(user, jips);
     }
     return userJobs;
+  }
+
+  
+  protected TaskTrackerStatus tracker(String taskTrackerName) {
+    return taskTrackerManager.getTaskTracker(taskTrackerName);
+  }
+  
+  protected Task checkAssignment(String taskTrackerName,
+      String expectedTaskString) throws IOException {
+    List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
+    assertNotNull(expectedTaskString, tasks);
+    assertEquals(expectedTaskString, 1, tasks.size());
+    assertEquals(expectedTaskString, tasks.get(0).toString());
+    return tasks.get(0);
+  }
+
+  /**
+   * Get the amount of memory that is reserved for tasks on the taskTracker and
+   * verify that it matches what is expected.
+   * 
+   * @param taskTracker
+   * @param expectedMemForMapsOnTT
+   * @param expectedMemForReducesOnTT
+   */
+  private void checkMemReservedForTasksOnTT(String taskTracker,
+      Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) {
+    Long observedMemForMapsOnTT =
+        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
+            CapacityTaskScheduler.TYPE.MAP);
+    Long observedMemForReducesOnTT =
+        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
+            CapacityTaskScheduler.TYPE.REDUCE);
+    if (expectedMemForMapsOnTT == null) {
+      assertTrue(observedMemForMapsOnTT == null);
+    } else {
+      assertTrue(observedMemForMapsOnTT.equals(expectedMemForMapsOnTT));
+    }
+    if (expectedMemForReducesOnTT == null) {
+      assertTrue(observedMemForReducesOnTT == null);
+    } else {
+      assertTrue(observedMemForReducesOnTT.equals(expectedMemForReducesOnTT));
+    }
+  }
 
+  /**
+   * Verify the number of slots of type 'type' from the queue 'queue'.
+   * 
+   * @param queue
+   * @param type
+   * @param numActiveUsers in the queue at present.
+   * @param expectedOccupiedSlots
+   * @param expectedOccupiedSlotsPercent
+   * @return
+   */
+  private void checkOccupiedSlots(String queue,
+      CapacityTaskScheduler.TYPE type, int numActiveUsers,
+      int expectedOccupiedSlots, float expectedOccupiedSlotsPercent) {
+    scheduler.updateQSIInfoForTests();
+    QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
+    String schedulingInfo =
+        queueManager.getJobQueueInfo(queue).getSchedulingInfo();
+    String[] infoStrings = schedulingInfo.split("\n");
+    int index = -1;
+    if (type.equals(CapacityTaskScheduler.TYPE.MAP)) {
+      index = 7;
+    } else if (type.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+      index = (numActiveUsers == 0 ? 12 : 13 + numActiveUsers);
+    }
+    LOG.info(infoStrings[index]);
+    assertEquals(String.format("Used capacity: %d (%.1f%% of Capacity)",
+        expectedOccupiedSlots, expectedOccupiedSlotsPercent),
+        infoStrings[index]);
+  }
+
+  private void checkQueuesOrder(String[] expectedOrder, String[] observedOrder) {
+    assertTrue("Observed and expected queues are not of same length.",
+        expectedOrder.length == observedOrder.length);
+    int i = 0;
+    for (String expectedQ : expectedOrder) {
+      assertTrue("Observed and expected queues are not in the same order. "
+          + "Differ at index " + i + ". Got " + observedOrder[i]
+          + " instead of " + expectedQ, expectedQ.equals(observedOrder[i]));
+      i++;
+    }
   }
 }

Modified: hadoop/core/branches/branch-0.20/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/webapps/job/jobdetails.jsp?rev=781918&r1=781917&r2=781918&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/webapps/job/jobdetails.jsp (original)
+++ hadoop/core/branches/branch-0.20/src/webapps/job/jobdetails.jsp Fri Jun  5 05:59:19 2009
@@ -248,6 +248,10 @@
           "<a href=\"jobblacklistedtrackers.jsp?jobid=" + jobId + "\">" +
           flakyTaskTrackers + "</a><br>\n");
     }
+    if (job.getSchedulingInfo() != null) {
+      out.print("<b>Job Scheduling information: </b>" +
+          job.getSchedulingInfo().toString() +"\n");
+    }
     out.print("<hr>\n");
     out.print("<table border=2 cellpadding=\"5\" cellspacing=\"2\">");
     out.print("<tr><th>Kind</th><th>% Complete</th><th>Num Tasks</th>" +



Mime
View raw message