hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077573 [2/3] - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapred/
Date Fri, 04 Mar 2011 04:30:34 GMT
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1077573&r1=1077572&r2=1077573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Mar  4 04:30:34 2011
@@ -58,261 +58,9 @@ import org.apache.hadoop.mapreduce.serve
 class CapacityTaskScheduler extends TaskScheduler {
 
 
-  /***********************************************************************
-   * Keeping track of scheduling information for queues
-   * 
-   * We need to maintain scheduling information relevant to a queue (its 
-   * name, capacity, etc), along with information specific to 
-   * each kind of task, Map or Reduce (num of running tasks, pending 
-   * tasks etc). 
-   * 
-   * This scheduling information is used to decide how to allocate
-   * tasks, redistribute capacity, etc.
-   *  
-   * A QueueSchedulingInfo(QSI) object represents scheduling information for
-   * a queue. A TaskSchedulingInfo (TSI) object represents scheduling 
-   * information for a particular kind of task (Map or Reduce).
-   *   
-   **********************************************************************/
-
-  static class TaskSchedulingInfo {
-
-    /** 
-     * the actual capacity, which depends on how many slots are available
-     * in the cluster at any given time. 
-     */
-    private int capacity = 0;
-    // number of running tasks
-    int numRunningTasks = 0;
-    // number of slots occupied by running tasks
-    int numSlotsOccupied = 0;
-
-    //the actual maximum capacity which depends on how many slots are available
-    //in cluster at any given time.
-    private int maxCapacity = -1;
-
-    /**
-     * for each user, we need to keep track of number of slots occupied by
-     * running tasks
-     */
-    Map<String, Integer> numSlotsOccupiedByUser = 
-      new HashMap<String, Integer>();
-
-    /**
-     * reset the variables associated with tasks
-     */
-    void resetTaskVars() {
-      numRunningTasks = 0;
-      numSlotsOccupied = 0;
-      for (String s: numSlotsOccupiedByUser.keySet()) {
-        numSlotsOccupiedByUser.put(s, Integer.valueOf(0));
-      }
-    }
-
-
-    /**
-     * Returns the actual capacity.
-     * capacity.
-     *
-     * @return
-     */
-    int getCapacity() {
-      return capacity;
-    }
-
-    /**
-     * Mutator method for capacity
-     *
-     * @param capacity
-     */
-    void setCapacity(int capacity) {
-        this.capacity = capacity;
-    }
-
-    /**
-     * @return the numRunningTasks
-     */
-    int getNumRunningTasks() {
-      return numRunningTasks;
-    }
-
-    /**
-     * @return the numSlotsOccupied
-     */
-    int getNumSlotsOccupied() {
-      return numSlotsOccupied;
-    }
-
-    /**
-     * return information about the tasks
-     */
-    @Override
-    public String toString() {
-      float occupiedSlotsAsPercent =
-          getCapacity() != 0 ?
-            ((float) numSlotsOccupied * 100 / getCapacity()) : 0;
-      StringBuffer sb = new StringBuffer();
-      
-      sb.append("Capacity: " + capacity + " slots\n");
-      
-      if(getMaxCapacity() >= 0) {
-        sb.append("Maximum capacity: " + getMaxCapacity() +" slots\n");
-      }
-      sb.append(String.format("Used capacity: %d (%.1f%% of Capacity)\n",
-          Integer.valueOf(numSlotsOccupied), Float
-              .valueOf(occupiedSlotsAsPercent)));
-      sb.append(String.format("Running tasks: %d\n", Integer
-          .valueOf(numRunningTasks)));
-      // include info on active users
-      if (numSlotsOccupied != 0) {
-        sb.append("Active users:\n");
-        for (Map.Entry<String, Integer> entry : numSlotsOccupiedByUser
-            .entrySet()) {
-          if ((entry.getValue() == null) || (entry.getValue().intValue() <= 0)) {
-            // user has no tasks running
-            continue;
-          }
-          sb.append("User '" + entry.getKey() + "': ");
-          int numSlotsOccupiedByThisUser = entry.getValue().intValue();
-          float p =
-              (float) numSlotsOccupiedByThisUser * 100 / numSlotsOccupied;
-          sb.append(String.format("%d (%.1f%% of used capacity)\n", Long
-              .valueOf(numSlotsOccupiedByThisUser), Float.valueOf(p)));
-        }
-      }
-      return sb.toString();
-    }
-
-    int getMaxCapacity() {
-      return maxCapacity;
-    }
-
-    void setMaxCapacity(int maxCapacity) {
-      this.maxCapacity = maxCapacity;
-    }
-  }
-  
-  static class QueueSchedulingInfo {
-    String queueName;
-
-    /**
-     * capacity(%) is set in the config
-     */
-    float capacityPercent = 0;
-    
-    
-  /**
-   * maxCapacityPercent(%) is set in config as
-   * mapred.capacity-scheduler.queue.<queue-name>.maximum-capacity
-   * maximum-capacity percent defines a limit beyond which a queue
-   * cannot expand. Remember this limit is dynamic and changes w.r.t
-   * cluster size.
-   */
-    float maxCapacityPercent = -1;
-    /** 
-     * to handle user limits, we need to know how many users have jobs in 
-     * the queue.
-     */  
-    Map<String, Integer> numJobsByUser = new HashMap<String, Integer>();
-      
-    /**
-     * min value of user limit (same for all users)
-     */
-    int ulMin;
-
-    /**
-     * We keep track of the JobQueuesManager only for reporting purposes 
-     * (in toString()). 
-     */
-    private JobQueuesManager jobQueuesManager;
-    
-    /**
-     * We keep a TaskSchedulingInfo object for each kind of task we support
-     */
-    TaskSchedulingInfo mapTSI;
-    TaskSchedulingInfo reduceTSI;
-    
-    public QueueSchedulingInfo(
-      String queueName, float capacityPercent,
-      float maxCapacityPercent, int ulMin, JobQueuesManager jobQueuesManager
-    ) {
-      this.queueName = new String(queueName);
-      this.capacityPercent = capacityPercent;
-      this.maxCapacityPercent = maxCapacityPercent;
-      this.ulMin = ulMin;
-      this.jobQueuesManager = jobQueuesManager;
-      this.mapTSI = new TaskSchedulingInfo();
-      this.reduceTSI = new TaskSchedulingInfo();
-    }
-    
-
-    /**
-     * @return the queueName
-     */
-    String getQueueName() {
-      return queueName;
-    }
-
-    /**
-     * @return the capacityPercent
-     */
-    float getCapacityPercent() {
-      return capacityPercent;
-    }
-
-    /**
-     * @return the mapTSI
-     */
-    TaskSchedulingInfo getMapTSI() {
-      return mapTSI;
-    }
-
-    /**
-     * @return the reduceTSI
-     */
-    TaskSchedulingInfo getReduceTSI() {
-      return reduceTSI;
-    }
-
-    /**
-     * return information about the queue
-     *
-     * @return a String representing the information about the queue.
-     */
-    @Override
-    public String toString(){
-      // We print out the queue information first, followed by info
-      // on map and reduce tasks and job info
-      StringBuffer sb = new StringBuffer();
-      sb.append("Queue configuration\n");
-      sb.append("Capacity Percentage: ");
-      sb.append(capacityPercent);
-      sb.append("%\n");
-      sb.append(String.format("User Limit: %d%s\n",ulMin, "%"));
-      sb.append(String.format("Priority Supported: %s\n",
-          (jobQueuesManager.doesQueueSupportPriorities(queueName))?
-              "YES":"NO"));
-      sb.append("-------------\n");
-
-      sb.append("Map tasks\n");
-      sb.append(mapTSI.toString());
-      sb.append("-------------\n");
-      sb.append("Reduce tasks\n");
-      sb.append(reduceTSI.toString());
-      sb.append("-------------\n");
-      
-      sb.append("Job info\n");
-      sb.append(String.format("Number of Waiting Jobs: %d\n", 
-          jobQueuesManager.getWaitingJobCount(queueName)));
-      sb.append(String.format("Number of users who have submitted jobs: %d\n", 
-          numJobsByUser.size()));
-      return sb.toString();
-    }
-  }
-
-  /** quick way to get qsi object given a queue name */
-  Map<String, QueueSchedulingInfo> queueInfoMap = 
-    new HashMap<String, QueueSchedulingInfo>();
+  /** quick way to get Queue object given a queue name */
+  Map<String, CapacitySchedulerQueue> queueInfoMap = 
+    new HashMap<String, CapacitySchedulerQueue>();
   
   /**
    * This class captures scheduling information we want to display or log.
@@ -328,12 +76,12 @@ class CapacityTaskScheduler extends Task
     
     @Override
     public String toString(){
-      // note that we do not call updateQSIObjects() here for performance
+      // note that we do not call updateAllQueues() here for performance
       // reasons. This means that the data we print out may be slightly
       // stale. This data is updated whenever assignTasks() is called
       // If this doesn't happen, the data gets stale. If we see
       // this often, we may need to detect this situation and call 
-      // updateQSIObjects(), or just call it each time. 
+      // updateAllQueues(), or just call it each time. 
       return scheduler.getDisplayInfo(queueName);
     }
   }
@@ -405,9 +153,6 @@ class CapacityTaskScheduler extends Task
     protected CapacityTaskScheduler scheduler;
     protected TaskType type = null;
 
-    abstract void updateTSI(QueueSchedulingInfo qsi, String user, 
-                            int numRunningTasks, int numSlotsOccupied);
-
     abstract TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker, 
         JobInProgress job, boolean assignOffSwitch) throws IOException;
 
@@ -420,7 +165,6 @@ class CapacityTaskScheduler extends Task
     abstract int getSlotsPerTask(JobInProgress job);
     abstract int getRunningTasks(JobInProgress job);
     abstract int getPendingTasks(JobInProgress job);
-    abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
     abstract int getNumReservedTaskTrackers(JobInProgress job);
     
     /**
@@ -446,13 +190,13 @@ class CapacityTaskScheduler extends Task
     }
     
     /**
-     * List of QSIs for assigning tasks.
+     * List of Queues for assigning tasks.
      * Queues are ordered by a ratio of (# of running tasks)/capacity, which
      * indicates how much 'free space' the queue has, or how much it is over
      * capacity. This ordered list is iterated over, when assigning tasks.
      */  
-    private List<QueueSchedulingInfo> qsiForAssigningTasks = 
-      new ArrayList<QueueSchedulingInfo>();
+    private List<CapacitySchedulerQueue> queuesForAssigningTasks = 
+      new ArrayList<CapacitySchedulerQueue>();
 
     /**
      * Comparator to sort queues.
@@ -460,33 +204,37 @@ class CapacityTaskScheduler extends Task
      * reducers, we use reduceTSI. So we'll need separate comparators.  
      */ 
     private static abstract class QueueComparator 
-      implements Comparator<QueueSchedulingInfo> {
-      abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
-      public int compare(QueueSchedulingInfo q1, QueueSchedulingInfo q2) {
-        TaskSchedulingInfo t1 = getTSI(q1);
-        TaskSchedulingInfo t2 = getTSI(q2);
+      implements Comparator<CapacitySchedulerQueue> {
+      abstract TaskType getTaskType();
+      
+      public int compare(CapacitySchedulerQueue q1, CapacitySchedulerQueue q2) {
         // look at how much capacity they've filled. Treat a queue with
         // capacity=0 equivalent to a queue running at capacity
-        double r1 = (0 == t1.getCapacity())? 1.0f:
-          (double)t1.numSlotsOccupied/(double) t1.getCapacity();
-        double r2 = (0 == t2.getCapacity())? 1.0f:
-          (double)t2.numSlotsOccupied/(double) t2.getCapacity();
+        TaskType taskType = getTaskType();
+        double r1 = (0 == q1.getCapacity(taskType))? 1.0f:
+          (double)q1.getNumSlotsOccupied(taskType)/(double) q1.getCapacity(taskType);
+        double r2 = (0 == q2.getCapacity(taskType))? 1.0f:
+          (double)q2.getNumSlotsOccupied(taskType)/(double) q2.getCapacity(taskType);
         if (r1<r2) return -1;
         else if (r1>r2) return 1;
         else return 0;
       }
     }
+    
     // subclass for map and reduce comparators
     private static final class MapQueueComparator extends QueueComparator {
-      TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
-        return qsi.mapTSI;
+      @Override
+      TaskType getTaskType() {
+        return TaskType.MAP;
       }
     }
     private static final class ReduceQueueComparator extends QueueComparator {
-      TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
-        return qsi.reduceTSI;
+      @Override
+      TaskType getTaskType() {
+        return TaskType.REDUCE;
       }
     }
+    
     // these are our comparator instances
     protected final static MapQueueComparator mapComparator = new MapQueueComparator();
     protected final static ReduceQueueComparator reduceComparator = new ReduceQueueComparator();
@@ -496,9 +244,9 @@ class CapacityTaskScheduler extends Task
     // Returns queues sorted according to the QueueComparator.
     // Mainly for testing purposes.
     String[] getOrderedQueues() {
-      List<String> queues = new ArrayList<String>(qsiForAssigningTasks.size());
-      for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
-        queues.add(qsi.queueName);
+      List<String> queues = new ArrayList<String>(queuesForAssigningTasks.size());
+      for (CapacitySchedulerQueue queue : queuesForAssigningTasks) {
+        queues.add(queue.queueName);
       }
       return queues.toArray(new String[queues.size()]);
     }
@@ -508,14 +256,15 @@ class CapacityTaskScheduler extends Task
     }
     
     // let the scheduling mgr know which queues are in the system
-    void initialize(Map<String, QueueSchedulingInfo> qsiMap) { 
-      // add all the qsi objects to our list and sort
-      qsiForAssigningTasks.addAll(qsiMap.values());
-      Collections.sort(qsiForAssigningTasks, queueComparator);
+    void initialize(Map<String, CapacitySchedulerQueue> queues) { 
+      // add all the queue objects to our list and sort
+      queuesForAssigningTasks.clear();
+      queuesForAssigningTasks.addAll(queues.values());
+      Collections.sort(queuesForAssigningTasks, queueComparator);
     }
     
-    private synchronized void updateCollectionOfQSIs() {
-      Collections.sort(qsiForAssigningTasks, queueComparator);
+    private synchronized void sortQueues() {
+      Collections.sort(queuesForAssigningTasks, queueComparator);
     }
 
     /**
@@ -536,35 +285,6 @@ class CapacityTaskScheduler extends Task
       return (a + (b - 1)) / b;
     }
     
-    private boolean isUserOverLimit(JobInProgress j, QueueSchedulingInfo qsi) {
-      // what is our current capacity? It is equal to the queue-capacity if
-      // we're running below capacity. If we're running over capacity, then its
-      // #running plus slotPerTask of the job (which is the number of extra
-      // slots we're getting).
-      int currentCapacity;
-      TaskSchedulingInfo tsi = getTSI(qsi);
-      if (tsi.numSlotsOccupied < tsi.getCapacity()) {
-        currentCapacity = tsi.getCapacity();
-      }
-      else {
-        currentCapacity = tsi.numSlotsOccupied + getSlotsPerTask(j);
-      }
-      int limit = 
-        Math.max(divideAndCeil(currentCapacity, qsi.numJobsByUser.size()), 
-                 divideAndCeil(qsi.ulMin*currentCapacity, 100));
-      String user = j.getProfile().getUser();
-      if (tsi.numSlotsOccupiedByUser.get(user) >= limit) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("User " + user + " is over limit, num slots occupied=" + 
-                    tsi.numSlotsOccupiedByUser.get(user) + ", limit=" + limit);
-        }
-        return true;
-      }
-      else {
-        return false;
-      }
-    }
-
     /*
      * This is the central scheduling method. 
      * It tries to get a task from jobs in a single queue. 
@@ -572,29 +292,26 @@ class CapacityTaskScheduler extends Task
      */
     private TaskLookupResult getTaskFromQueue(TaskTracker taskTracker,
                                               int availableSlots,
-                                              QueueSchedulingInfo qsi,
+                                              CapacitySchedulerQueue queue,
                                               boolean assignOffSwitch)
     throws IOException {
       TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
       // we only look at jobs in the running queues, as these are the ones
       // who have been potentially initialized
 
-      for (JobInProgress j : 
-        scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
+      for (JobInProgress j : queue.getRunningJobs()) {
         // only look at jobs that can be run. We ignore jobs that haven't 
         // initialized, or have completed but haven't been removed from the 
         // running queue. 
         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
           continue;
         }
-        //Check if queue is over maximum-capacity
-        if(this.areTasksInQueueOverMaxCapacity(qsi,j.getNumSlotsPerTask(type))) {
+        
+        // Check to ensure that the job/user/queue are under limits
+        if (!queue.assignSlotsToJob(type, j, j.getProfile().getUser())) {
           continue;
         }
-        // check if the job's user is over limit
-        if (isUserOverLimit(j, qsi)) {
-          continue;
-        } 
+
         //If this job meets memory requirements. Ask the JobInProgress for
         //a task to be scheduled on the task tracker.
         //if we find a job then we pass it on.
@@ -640,72 +357,16 @@ class CapacityTaskScheduler extends Task
         // if we're here, this job has no task to run. Look at the next job.
       }//end of for loop
 
-      // if we're here, we haven't found any task to run among all jobs in 
-      // the queue. This could be because there is nothing to run, or that 
-      // the user limit for some user is too strict, i.e., there's at least 
-      // one user who doesn't have enough tasks to satisfy his limit. If 
-      // it's the latter case, re-look at jobs without considering user 
-      // limits, and get a task from the first eligible job; however
-      // we do not 'reserve' slots on tasktrackers anymore since the user is 
-      // already over the limit
-      // Note: some of the code from above is repeated here. This is on 
-      // purpose as it improves overall readability.  
-      // Note: we walk through jobs again. Some of these jobs, which weren't
-      // considered in the first pass, shouldn't be considered here again, 
-      // but we still check for their viability to keep the code simple. In
-      // some cases, for high mem jobs that have nothing to run, we call 
-      // obtainNewTask() unnecessarily. Should this be a problem, we can 
-      // create a list of jobs to look at (those whose users were over 
-      // limit) in the first pass and walk through that list only. 
-      for (JobInProgress j : 
-        scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
-        if (j.getStatus().getRunState() != JobStatus.RUNNING) {
-          continue;
-        }
-        //Check if queue is over maximum-capacity
-        if (this.areTasksInQueueOverMaxCapacity(
-          qsi, j.getNumSlotsPerTask(type))) {
-          continue;
-        }
-        
-        if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
-            taskTrackerStatus, availableSlots)) {
-          // We found a suitable job. Get task from it.
-          TaskLookupResult tlr = 
-            obtainNewTask(taskTrackerStatus, j, assignOffSwitch);
-          
-          if (tlr.getLookUpStatus() == 
-                  TaskLookupResult.LookUpStatus.LOCAL_TASK_FOUND || 
-              tlr.getLookUpStatus() == 
-                  TaskLookupResult.LookUpStatus.OFF_SWITCH_TASK_FOUND) {
-            // we're successful in getting a task
-            return tlr;
-          } else {
-            //skip to the next job in the queue.
-            continue;
-          }
-        } else {
-          //if memory requirements don't match then we check if the 
-          //job has either pending or speculative task. If the job
-          //has pending or speculative task we block till this job
-          //tasks get scheduled, so that high memory jobs are not 
-          //starved
-          if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTrackerStatus)) {
-            return TaskLookupResult.getMemFailedResult();
-          } 
-        }//end of memory check block
-      }//end of for loop
-
       // found nothing for this queue, look at the next one.
       if (LOG.isDebugEnabled()) {
-        String msg = "Found no task from the queue " + qsi.queueName;
+        String msg = "Found no task from the queue " + queue.queueName;
         LOG.debug(msg);
       }
       return TaskLookupResult.getNoTaskFoundResult();
     }
 
     // Always return a TaskLookupResult object. Don't return null. 
-    // The caller is responsible for ensuring that the QSI objects and the 
+    // The caller is responsible for ensuring that the Queue objects and the 
     // collections are up-to-date.
     private TaskLookupResult assignTasks(TaskTracker taskTracker, 
                                          int availableSlots, 
@@ -713,7 +374,7 @@ class CapacityTaskScheduler extends Task
     throws IOException {
       TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
 
-      printQSIs();
+      printQueues();
 
       // Check if this tasktracker has been reserved for a job...
       JobInProgress job = taskTracker.getJobForFallowSlot(type);
@@ -748,20 +409,21 @@ class CapacityTaskScheduler extends Task
       }
       
       
-      for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
+      for (CapacitySchedulerQueue queue : queuesForAssigningTasks) {
         // we may have queues with capacity=0. We shouldn't look at jobs from 
         // these queues
-        if (0 == getTSI(qsi).getCapacity()) {
+        if (0 == queue.getCapacity(TaskType.MAP)) {
           continue;
         }
 
         //This call is for optimization if we are already over the
         //maximum-capacity we avoid traversing the queues.
-        if(this.areTasksInQueueOverMaxCapacity(qsi,1)) {
+        if (!queue.assignSlotsToQueue(type, 1)) {
           continue;
         }
+        
         TaskLookupResult tlr = 
-          getTaskFromQueue(taskTracker, availableSlots, qsi, assignOffSwitch);
+          getTaskFromQueue(taskTracker, availableSlots, queue, assignOffSwitch);
         TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();
 
         if (lookUpStatus == TaskLookupResult.LookUpStatus.NO_TASK_FOUND) {
@@ -784,77 +446,26 @@ class CapacityTaskScheduler extends Task
       return TaskLookupResult.getNoTaskFoundResult();
     }
 
-
-    /**
-     * Check if maximum-capacity is set for this queue.
-     * If set and greater than 0 ,
-     * check if numofslotsoccupied+numSlotsPerTask is greater than
-     * maximum-capacity , if yes , implies this queue is over limit.
-     *
-     * Incase noOfSlotsOccupied is less than maximum-capacity ,but ,
-     * numOfSlotsOccupied + noSlotsPerTask is more than maximum-capacity we
-     * still dont assign the task . This may lead to under utilization of very
-     * small set of slots. But this is ok , as we strictly respect the
-     * maximum-capacity limit.
-     * 
-     * @param qsi
-     * @return true if queue is over limit.
-     */
-
-    private boolean areTasksInQueueOverMaxCapacity(
-      QueueSchedulingInfo qsi, int numSlotsPerTask) {
-      TaskSchedulingInfo tsi = getTSI(qsi);
-      if (tsi.getMaxCapacity() >= 0) {
-        if ((tsi.numSlotsOccupied + numSlotsPerTask) > tsi.getMaxCapacity()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(
-              "Queue " + qsi.queueName + " " + "has reached its  max " + type +
-                "Capacity");
-            LOG.debug("Current running tasks " + tsi.getCapacity());
-
-          }
-          return true;
-        }
-      }
-      return false;
-    }
-
     // for debugging.
-    private void printQSIs() {
+    private void printQueues() {
       if (LOG.isDebugEnabled()) {
         StringBuffer s = new StringBuffer();
-        for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
-          TaskSchedulingInfo tsi = getTSI(qsi);
-          Collection<JobInProgress> runJobs =
-            scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
+        for (CapacitySchedulerQueue queue : queuesForAssigningTasks) {
+          Collection<JobInProgress> runJobs = queue.getRunningJobs();
           s.append(
             String.format(
               " Queue '%s'(%s): runningTasks=%d, "
                 + "occupiedSlots=%d, capacity=%d, runJobs=%d  maxCapacity=%d ",
-              qsi.queueName,
-              this.type, Integer.valueOf(tsi.numRunningTasks), Integer
-                .valueOf(tsi.numSlotsOccupied), Integer
-                .valueOf(tsi.getCapacity()), Integer.valueOf(runJobs.size()),
-              Integer.valueOf(tsi.getMaxCapacity())));
+              queue.queueName,
+              this.type, 
+              Integer.valueOf(queue.getNumRunningTasks(type)), 
+              Integer.valueOf(queue.getNumSlotsOccupied(type)), 
+              Integer.valueOf(queue.getCapacity(type)), 
+              Integer.valueOf(runJobs.size()),
+              Integer.valueOf(queue.getMaxCapacity(type))));
         }
         LOG.debug(s);
       }
-      
-      StringBuffer s = new StringBuffer();
-      for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
-        TaskSchedulingInfo tsi = getTSI(qsi);
-        Collection<JobInProgress> runJobs =
-          scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
-        s.append(
-          String.format(
-            " Queue '%s'(%s): runningTasks=%d, "
-              + "occupiedSlots=%d, capacity=%d, runJobs=%d  maxCapacity=%d ",
-            qsi.queueName,
-            this.type, Integer.valueOf(tsi.numRunningTasks), Integer
-              .valueOf(tsi.numSlotsOccupied), Integer
-              .valueOf(tsi.getCapacity()), Integer.valueOf(runJobs.size()),
-            Integer.valueOf(tsi.getMaxCapacity())));
-      }
     }
     
     /**
@@ -892,16 +503,6 @@ class CapacityTaskScheduler extends Task
     }
 
     @Override
-    void updateTSI(QueueSchedulingInfo qsi, String user, 
-                   int numRunningTasks, int numSlotsOccupied) {
-      qsi.mapTSI.numRunningTasks += numRunningTasks;
-      qsi.mapTSI.numSlotsOccupied += numSlotsOccupied;
-      Integer i = qsi.mapTSI.numSlotsOccupiedByUser.get(user);
-      int slots = numSlotsOccupied + ((i == null) ? 0 : i.intValue());
-      qsi.mapTSI.numSlotsOccupiedByUser.put(user, slots);
-    }
-    
-    @Override
     TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker, 
                                    JobInProgress job, boolean assignOffSwitch) 
     throws IOException {
@@ -955,11 +556,6 @@ class CapacityTaskScheduler extends Task
       return job.getNumSlotsPerTask(TaskType.MAP);
     }
 
-    @Override
-    TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
-      return qsi.mapTSI;
-    }
-
     int getNumReservedTaskTrackers(JobInProgress job) {
       return job.getNumReservedTaskTrackersForMaps();
     }
@@ -972,7 +568,6 @@ class CapacityTaskScheduler extends Task
           hasSpeculativeTask(job.getTasks(TaskType.MAP), 
               job.getStatus().mapProgress(), tts));
     }
-
   }
 
   /**
@@ -987,16 +582,6 @@ class CapacityTaskScheduler extends Task
     }
 
     @Override
-    void updateTSI(QueueSchedulingInfo qsi, String user, 
-                   int numRunningTasks, int numSlotsOccupied) {
-      qsi.reduceTSI.numRunningTasks += numRunningTasks;
-      qsi.reduceTSI.numSlotsOccupied += numSlotsOccupied;
-      Integer i = qsi.reduceTSI.numSlotsOccupiedByUser.get(user);
-      qsi.reduceTSI.numSlotsOccupiedByUser.put(user,
-          Integer.valueOf(i.intValue() + numSlotsOccupied));
-    }
-
-    @Override
     TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker, 
                                    JobInProgress job, boolean unused) 
     throws IOException {
@@ -1031,11 +616,6 @@ class CapacityTaskScheduler extends Task
       return job.getNumSlotsPerTask(TaskType.REDUCE);    
     }
 
-    @Override
-    TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
-      return qsi.reduceTSI;
-    }
-
     int getNumReservedTaskTrackers(JobInProgress job) {
       return job.getNumReservedTaskTrackersForReduces();
     }
@@ -1048,7 +628,6 @@ class CapacityTaskScheduler extends Task
           hasSpeculativeTask(job.getTasks(TaskType.REDUCE), 
               job.getStatus().reduceProgress(), tts));
     }
-
   }
   
   /** the scheduling mgrs for Map and Reduce tasks */ 
@@ -1057,11 +636,6 @@ class CapacityTaskScheduler extends Task
 
   MemoryMatcher memoryMatcher = new MemoryMatcher(this);
 
-  /** we keep track of the number of map/reduce slots we saw last */
-  private int prevMapClusterCapacity = 0;
-  private int prevReduceClusterCapacity = 0;
-  
-    
   static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
   protected JobQueuesManager jobQueuesManager;
   protected CapacitySchedulerConf schedConf;
@@ -1099,8 +673,45 @@ class CapacityTaskScheduler extends Task
   public void setResourceManagerConf(CapacitySchedulerConf conf) {
     this.schedConf = conf;
   }
-
-  private void initializeMemoryRelatedConf() {
+  
+  @Override
+  public synchronized void refresh() throws IOException {
+    Configuration conf = new Configuration();
+    CapacitySchedulerConf schedConf = new CapacitySchedulerConf();
+    
+    // Refresh
+    QueueManager queueManager = taskTrackerManager.getQueueManager();
+    Set<String> queueNames = queueManager.getQueues();
+    Map<String, CapacitySchedulerQueue> newQueues =
+      parseQueues(queueManager.getQueues(), schedConf);
+    
+    // Check to ensure no queue has been deleted
+    checkForQueueDeletion(queueInfoMap, newQueues);
+    
+    // Re-intialize the scheduler
+    initialize(queueManager, newQueues, conf, schedConf);
+    
+    // Inform the job-init-poller
+    initializationPoller.reinit(queueNames);
+    
+    // Finally, reset the configuration
+    setConf(conf);
+    this.schedConf = schedConf;
+  }
+
+  private void 
+  checkForQueueDeletion(Map<String, CapacitySchedulerQueue> currentQueues, 
+      Map<String, CapacitySchedulerQueue> newQueues) 
+  throws IOException {
+    for (String queueName : currentQueues.keySet()) {
+      if (!newQueues.containsKey(queueName)) {
+        throw new IOException("Couldn't find queue '" + queueName + 
+            "' during refresh!");
+      }
+    }
+  }
+  
+  private void initializeMemoryRelatedConf(Configuration conf) {
     //handling @deprecated
     if (conf.get(
       CapacitySchedulerConf.DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY) !=
@@ -1208,68 +819,11 @@ class CapacityTaskScheduler extends Task
       schedConf = new CapacitySchedulerConf();
     }
 
-    initializeMemoryRelatedConf();
-    
-    // read queue info from config file
+    // Initialize queues
     QueueManager queueManager = taskTrackerManager.getQueueManager();
-    Set<String> queues = queueManager.getQueues();
-    // Sanity check: there should be at least one queue. 
-    if (0 == queues.size()) {
-      throw new IllegalStateException("System has no queue configured");
-    }
-
-    Set<String> queuesWithoutConfiguredCapacity = new HashSet<String>();
-    float totalCapacityPercent = 0.0f;
-    for (String queueName: queues) {
-      float capacityPercent = schedConf.getCapacity(queueName);
-      if (capacityPercent == -1.0) {
-        queuesWithoutConfiguredCapacity.add(queueName);
-      }else {
-        totalCapacityPercent += capacityPercent;
-      }
-
-      float maxCapacityPercent = schedConf.getMaxCapacity(queueName);
-      int ulMin = schedConf.getMinimumUserLimitPercent(queueName);
-      // create our QSI and add to our hashmap
-      QueueSchedulingInfo qsi = new QueueSchedulingInfo(
-        queueName, capacityPercent, maxCapacityPercent ,ulMin, jobQueuesManager);
-      queueInfoMap.put(queueName, qsi);
-
-      // create the queues of job objects
-      boolean supportsPrio = schedConf.isPrioritySupported(queueName);
-      jobQueuesManager.createQueue(queueName, supportsPrio);
-      
-      SchedulingDisplayInfo schedulingInfo = 
-        new SchedulingDisplayInfo(queueName, this);
-      queueManager.setSchedulerInfo(queueName, schedulingInfo);
-      
-    }
-    float remainingQuantityToAllocate = 100 - totalCapacityPercent;
-    float quantityToAllocate = 
-      remainingQuantityToAllocate/queuesWithoutConfiguredCapacity.size();
-    for(String queue: queuesWithoutConfiguredCapacity) {
-      QueueSchedulingInfo qsi = queueInfoMap.get(queue); 
-      qsi.capacityPercent = quantityToAllocate;
-      if(qsi.maxCapacityPercent >= 0) {
-        if(qsi.capacityPercent > qsi.maxCapacityPercent) {
-          throw new IllegalStateException(
-            " Allocated capacity of " + qsi.capacityPercent +
-              " to unconfigured queue " + qsi.queueName +
-              " is greater than maximum Capacity " + qsi.maxCapacityPercent);
-        }
-      }
-      schedConf.setCapacity(queue, quantityToAllocate);
-    }    
-    
-    if (totalCapacityPercent > 100.0) {
-      throw new IllegalArgumentException(
-        "Sum of queue capacities over 100% at "
-          + totalCapacityPercent);
-    }    
-    
-    // let our mgr objects know about the queues
-    mapScheduler.initialize(queueInfoMap);
-    reduceScheduler.initialize(queueInfoMap);
+    Set<String> queueNames = queueManager.getQueues();
+    initialize(queueManager, parseQueues(queueNames, schedConf), 
+        getConf(), schedConf);
     
     // listen to job changes
     taskTrackerManager.addJobInProgressListener(jobQueuesManager);
@@ -1277,9 +831,9 @@ class CapacityTaskScheduler extends Task
     //Start thread for initialization
     if (initializationPoller == null) {
       this.initializationPoller = new JobInitializationPoller(
-          jobQueuesManager,schedConf,queues, taskTrackerManager);
+          jobQueuesManager, schedConf, queueNames, taskTrackerManager);
     }
-    initializationPoller.init(queueManager.getQueues(), schedConf);
+    initializationPoller.init(queueNames.size(), schedConf);
     initializationPoller.setDaemon(true);
     initializationPoller.start();
 
@@ -1292,7 +846,79 @@ class CapacityTaskScheduler extends Task
     }
 
     started = true;
-    LOG.info("Capacity scheduler initialized " + queues.size() + " queues");  
+    LOG.info("Capacity scheduler initialized " + queueNames.size() + " queues");  
+  }
+  
+  
+  void initialize(QueueManager queueManager,
+      Map<String, CapacitySchedulerQueue> newQueues,
+      Configuration conf, CapacitySchedulerConf schedConf) {
+    // Memory related configs
+    initializeMemoryRelatedConf(conf);
+
+    // Setup queues
+    for (Map.Entry<String, CapacitySchedulerQueue> e : newQueues.entrySet()) {
+      String newQueueName = e.getKey();
+      CapacitySchedulerQueue newQueue = e.getValue();
+      CapacitySchedulerQueue currentQueue = queueInfoMap.get(newQueueName);
+      if (currentQueue != null) {
+        currentQueue.initializeQueue(newQueue);
+        LOG.info("Updated queue configs for " + newQueueName);
+      } else {
+        queueInfoMap.put(newQueueName, newQueue);
+        LOG.info("Added new queue: " + newQueueName);
+      }
+    }
+
+    // Set SchedulingDisplayInfo
+    for (String queueName : queueInfoMap.keySet()) {
+      SchedulingDisplayInfo schedulingInfo = 
+        new SchedulingDisplayInfo(queueName, this);
+      queueManager.setSchedulerInfo(queueName, schedulingInfo);
+    }
+
+    // Inform the queue manager 
+    jobQueuesManager.setQueues(queueInfoMap);
+    
+    // let our mgr objects know about the queues
+    mapScheduler.initialize(queueInfoMap);
+    reduceScheduler.initialize(queueInfoMap);
+  }
+  
+  Map<String, CapacitySchedulerQueue> 
+  parseQueues(Collection<String> queueNames, CapacitySchedulerConf schedConf) 
+  throws IOException {
+    Map<String, CapacitySchedulerQueue> queueInfoMap = 
+      new HashMap<String, CapacitySchedulerQueue>();
+    
+    // Sanity check: there should be at least one queue. 
+    if (0 == queueNames.size()) {
+      throw new IllegalStateException("System has no queue configured");
+    }
+
+    float totalCapacityPercent = 0.0f;
+    for (String queueName: queueNames) {
+      float capacityPercent = schedConf.getCapacity(queueName);
+      if (capacityPercent == -1.0) {
+        throw new IOException("Queue '" + queueName + 
+            "' doesn't have configured capacity!");
+      } 
+      
+      totalCapacityPercent += capacityPercent;
+
+      // create our Queue and add to our hashmap
+      CapacitySchedulerQueue queue = 
+        new CapacitySchedulerQueue(queueName, schedConf);
+      queueInfoMap.put(queueName, queue);
+    }
+    
+    if (Math.floor(totalCapacityPercent) != 100) {
+      throw new IllegalArgumentException(
+        "Sum of queue capacities not 100% at "
+          + totalCapacityPercent);
+    }    
+
+    return queueInfoMap;
   }
   
   /** mostly for testing purposes */
@@ -1319,125 +945,35 @@ class CapacityTaskScheduler extends Task
 
   /**
    * provided for the test classes
-   * lets you update the QSI objects and sorted collections
+   * lets you update the Queue objects and sorted collections
    */ 
-  void updateQSIInfoForTests() {
+  void updateQueueUsageForTests() {
     ClusterStatus c = taskTrackerManager.getClusterStatus();
     int mapClusterCapacity = c.getMaxMapTasks();
     int reduceClusterCapacity = c.getMaxReduceTasks();
-    // update the QSI objects
-    updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
-    mapScheduler.updateCollectionOfQSIs();
-    reduceScheduler.updateCollectionOfQSIs();
-    mapScheduler.printQSIs();
-    reduceScheduler.printQSIs();
+    // update the Queue objects
+    updateAllQueues(mapClusterCapacity, reduceClusterCapacity);
+    mapScheduler.sortQueues();
+    reduceScheduler.sortQueues();
+    mapScheduler.printQueues();
+    reduceScheduler.printQueues();
   }
 
   /**
-   * Update individual QSI objects.
+   * Update all queues to reflect current usage.
    * We don't need exact information for all variables, just enough for us
    * to make scheduling decisions. For example, we don't need an exact count
    * of numRunningTasks. Once we count upto the grid capacity, any
    * number beyond that will make no difference.
    */
-  private synchronized void updateQSIObjects(
-    int mapClusterCapacity,
-      int reduceClusterCapacity) {
+  private synchronized void updateAllQueues(
+    int mapClusterCapacity, int reduceClusterCapacity) {
     // if # of slots have changed since last time, update.
     // First, compute whether the total number of TT slots have changed
-    for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
-      // compute new capacities, if TT slots have changed
-      if (mapClusterCapacity != prevMapClusterCapacity) {
-        qsi.mapTSI.setCapacity(
-          (int)
-          (qsi.capacityPercent*mapClusterCapacity/100));
-
-        //compute new max map capacities
-        if(qsi.maxCapacityPercent > 0) {
-          qsi.mapTSI.setMaxCapacity(
-            (int) (qsi.maxCapacityPercent * mapClusterCapacity / 100));
-        }
-      }
-      if (reduceClusterCapacity != prevReduceClusterCapacity) {
-        qsi.reduceTSI.setCapacity(
-          (int)
-            (qsi.capacityPercent * reduceClusterCapacity / 100));
-
-        //compute new max reduce capacities
-        if (qsi.maxCapacityPercent > 0) {
-          qsi.reduceTSI.setMaxCapacity(
-            (int) (qsi.maxCapacityPercent * reduceClusterCapacity / 100));
-        }
-      }
-
-      // reset running/pending tasks, tasks per user
-      qsi.mapTSI.resetTaskVars();
-      qsi.reduceTSI.resetTaskVars();
-      // update stats on running jobs
-      for (JobInProgress j:
-        jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
-        if (j.getStatus().getRunState() != JobStatus.RUNNING) {
-          continue;
-        }
-
-        int numMapsRunningForThisJob = mapScheduler.getRunningTasks(j);
-        int numReducesRunningForThisJob = reduceScheduler.getRunningTasks(j);
-        int numRunningMapSlots = 
-          numMapsRunningForThisJob * mapScheduler.getSlotsPerTask(j);
-        int numRunningReduceSlots =
-          numReducesRunningForThisJob * reduceScheduler.getSlotsPerTask(j);
-        int numMapSlotsForThisJob = mapScheduler.getSlotsOccupied(j);
-        int numReduceSlotsForThisJob = reduceScheduler.getSlotsOccupied(j);
-        int numReservedMapSlotsForThisJob = 
-          (mapScheduler.getNumReservedTaskTrackers(j) * 
-           mapScheduler.getSlotsPerTask(j)); 
-        int numReservedReduceSlotsForThisJob = 
-          (reduceScheduler.getNumReservedTaskTrackers(j) * 
-           reduceScheduler.getSlotsPerTask(j)); 
-        
-        j.setSchedulingInfo(getJobQueueSchedInfo(numMapsRunningForThisJob, 
-                              numRunningMapSlots,
-                              numReservedMapSlotsForThisJob,
-                              numReducesRunningForThisJob, 
-                              numRunningReduceSlots,
-                              numReservedReduceSlotsForThisJob));
-        
-        mapScheduler.updateTSI(qsi, j.getProfile().getUser(), 
-                               numMapsRunningForThisJob, numMapSlotsForThisJob);
-        reduceScheduler.updateTSI(qsi, j.getProfile().getUser(), 
-                                  numReducesRunningForThisJob, 
-                                  numReduceSlotsForThisJob);
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("updateQSI: job %s: run(m)=%d, "
-              + "occupied(m)=%d, run(r)=%d, occupied(r)=%d, finished(m)=%d,"
-              + " finished(r)=%d, failed(m)=%d, failed(r)=%d, "
-              + "spec(m)=%d, spec(r)=%d, total(m)=%d, total(r)=%d", j
-              .getJobID().toString(), Integer
-              .valueOf(numMapsRunningForThisJob), Integer
-              .valueOf(numMapSlotsForThisJob), Integer
-              .valueOf(numReducesRunningForThisJob), Integer
-              .valueOf(numReduceSlotsForThisJob), Integer.valueOf(j
-              .finishedMaps()), Integer.valueOf(j.finishedReduces()), Integer
-              .valueOf(j.failedMapTasks),
-              Integer.valueOf(j.failedReduceTasks), Integer
-                  .valueOf(j.speculativeMapTasks), Integer
-                  .valueOf(j.speculativeReduceTasks), Integer
-                  .valueOf(j.numMapTasks), Integer.valueOf(j.numReduceTasks)));
-        }
-
-        /*
-         * it's fine walking down the entire list of running jobs - there
-         * probably will not be many, plus, we may need to go through the
-         * list to compute numSlotsOccupiedByUser. If this is expensive, we
-         * can keep a list of running jobs per user. Then we only need to
-         * consider the first few jobs per user.
-         */
-      }
+    for (CapacitySchedulerQueue queue: queueInfoMap.values()) {
+      queue.updateAll(mapClusterCapacity, reduceClusterCapacity, 
+                 mapScheduler, reduceScheduler);
     }
-
-    prevMapClusterCapacity = mapClusterCapacity;
-    prevReduceClusterCapacity = reduceClusterCapacity;
   }
 
   private static final int JOBQUEUE_SCHEDULINGINFO_INITIAL_LENGTH = 175;
@@ -1493,17 +1029,16 @@ class CapacityTaskScheduler extends Task
     }
 
     /* 
-     * update all our QSI objects.
-     * This involves updating each qsi structure. This operation depends
+     * update all our queues
+     * This involves updating each queue structure. This operation depends
      * on the number of running jobs in a queue, and some waiting jobs. If it
      * becomes expensive, do it once every few heartbeats only.
      */ 
-    updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
+    updateAllQueues(mapClusterCapacity, reduceClusterCapacity);
     
     // schedule tasks
     List<Task> result = new ArrayList<Task>();
     addMapTasks(taskTracker, result, maxMapSlots, currentMapSlots);
-    int numMapsAssigned = result.size(); 
     addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
     return result;
   }
@@ -1515,7 +1050,7 @@ class CapacityTaskScheduler extends Task
                     throws IOException {
     int availableSlots = maxReduceSlots - currentReduceSlots;
     if (availableSlots > 0) {
-      reduceScheduler.updateCollectionOfQSIs();
+      reduceScheduler.sortQueues();
       TaskLookupResult tlr = 
         reduceScheduler.assignTasks(taskTracker, availableSlots, true);
       if (TaskLookupResult.LookUpStatus.LOCAL_TASK_FOUND == tlr.getLookUpStatus()) {
@@ -1532,7 +1067,7 @@ class CapacityTaskScheduler extends Task
     int availableSlots = maxMapSlots - currentMapSlots;
     boolean assignOffSwitch = true;
     while (availableSlots > 0) {
-      mapScheduler.updateCollectionOfQSIs();
+      mapScheduler.sortQueues();
       TaskLookupResult tlr = mapScheduler.assignTasks(taskTracker, 
                                                       availableSlots,
                                                       assignOffSwitch);
@@ -1553,39 +1088,33 @@ class CapacityTaskScheduler extends Task
         // Atmost 1 off-switch task per-heartbeat
         assignOffSwitch = false;
       }
+      
+      // Assigned some slots
       availableSlots -= t.getNumSlotsRequired();
-      mapScheduler.updateTSI(queueInfoMap.get(job.getProfile().getQueueName()), 
-                             job.getProfile().getUser(), 1, 
-                             t.getNumSlotsRequired());
+      
+      // Update the queue
+      CapacitySchedulerQueue queue = 
+        queueInfoMap.get(job.getProfile().getQueueName());
+      queue.update(TaskType.MAP, 
+          job.getProfile().getUser(), 1, t.getNumSlotsRequired());
     }
   }
   
   // called when a job is added
   synchronized void jobAdded(JobInProgress job) throws IOException {
-    QueueSchedulingInfo qsi = 
+    CapacitySchedulerQueue queue = 
       queueInfoMap.get(job.getProfile().getQueueName());
-    // qsi shouldn't be null
-    // update user-specific info
-    Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
-    if (null == i) {
-      i = 1;
-      // set the count for running tasks to 0
-      qsi.mapTSI.numSlotsOccupiedByUser.put(job.getProfile().getUser(),
-          Integer.valueOf(0));
-      qsi.reduceTSI.numSlotsOccupiedByUser.put(job.getProfile().getUser(),
-          Integer.valueOf(0));
-    }
-    else {
-      i++;
-    }
-    qsi.numJobsByUser.put(job.getProfile().getUser(), i);
+    
+    // Inform the queue
+    queue.jobAdded(job);
     
     // setup scheduler specific job information
     preInitializeJob(job);
     
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Job " + job.getJobID().toString() + " is added under user " 
-              + job.getProfile().getUser() + ", user now has " + i + " jobs");
+      String user = job.getProfile().getUser();
+      LOG.debug("Job " + job.getJobID() + " is added under user " + user + 
+                ", user now has " + queue.getNumJobsByUser(user) + " jobs");
     }
   }
 
@@ -1610,43 +1139,22 @@ class CapacityTaskScheduler extends Task
   
   // called when a job completes
   synchronized void jobCompleted(JobInProgress job) {
-    QueueSchedulingInfo qsi = 
+    CapacitySchedulerQueue queue = 
       queueInfoMap.get(job.getProfile().getQueueName());
-    // qsi shouldn't be null
-    // update numJobsByUser
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Job to be removed for user " + job.getProfile().getUser());
-    }
-    Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
-    i--;
-    if (0 == i.intValue()) {
-      qsi.numJobsByUser.remove(job.getProfile().getUser());
-      // remove job footprint from our TSIs
-      qsi.mapTSI.numSlotsOccupiedByUser.remove(job.getProfile().getUser());
-      qsi.reduceTSI.numSlotsOccupiedByUser.remove(job.getProfile().getUser());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("No more jobs for user, number of users = " + qsi.numJobsByUser.size());
-      }
-    }
-    else {
-      qsi.numJobsByUser.put(job.getProfile().getUser(), i);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("User still has " + i + " jobs, number of users = "
-                + qsi.numJobsByUser.size());
-      }
-    }
+    
+    // Inform the queue
+    queue.jobCompleted(job);
   }
   
   @Override
   public synchronized Collection<JobInProgress> getJobs(String queueName) {
     Collection<JobInProgress> jobCollection = new ArrayList<JobInProgress>();
-    Collection<JobInProgress> runningJobs = 
-        jobQueuesManager.getRunningJobQueue(queueName);
+    CapacitySchedulerQueue queue = queueInfoMap.get(queueName);
+    Collection<JobInProgress> runningJobs = queue.getRunningJobs();
     if (runningJobs != null) {
       jobCollection.addAll(runningJobs);
     }
-    Collection<JobInProgress> waitingJobs = 
-      jobQueuesManager.getWaitingJobs(queueName);
+    Collection<JobInProgress> waitingJobs = queue.getWaitingJobs();
     Collection<JobInProgress> tempCollection = new ArrayList<JobInProgress>();
     if(waitingJobs != null) {
       tempCollection.addAll(waitingJobs);
@@ -1669,7 +1177,7 @@ class CapacityTaskScheduler extends Task
     return jobQueuesManager;
   }
 
-  Map<String, QueueSchedulingInfo> getQueueInfoMap() {
+  Map<String, CapacitySchedulerQueue> getQueueInfoMap() {
     return queueInfoMap;
   }
 
@@ -1688,11 +1196,11 @@ class CapacityTaskScheduler extends Task
   }
 
   synchronized String getDisplayInfo(String queueName) {
-    QueueSchedulingInfo qsi = queueInfoMap.get(queueName);
-    if (null == qsi) { 
+    CapacitySchedulerQueue queue = queueInfoMap.get(queueName);
+    if (null == queue) { 
       return null;
     }
-    return qsi.toString();
+    return queue.toString();
   }
 
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java?rev=1077573&r1=1077572&r2=1077573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java Fri Mar  4 04:30:34 2011
@@ -19,11 +19,15 @@ package org.apache.hadoop.mapred;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -68,16 +72,6 @@ public class JobInitializationPoller ext
   private static final Log LOG = LogFactory
       .getLog(JobInitializationPoller.class.getName());
 
-  /*
-   * The poller picks up jobs across users to initialize based on user limits.
-   * Suppose the user limit for a queue is 25%, it means atmost 4 users' jobs
-   * can run together. However, in order to account for jobs from a user that
-   * might complete faster than others, it initializes jobs from an additional
-   * number of users as a backlog. This variable defines the additional
-   * number of users whose jobs can be considered for initializing. 
-   */
-  private static final int MAX_ADDITIONAL_USERS_TO_INIT = 2;
-
   private JobQueuesManager jobQueueManager;
   private long sleepInterval;
   private int poolSize;
@@ -100,11 +94,12 @@ public class JobInitializationPoller ext
      * The hash map which maintains relationship between queue to jobs to
      * initialize per queue.
      */
-    private HashMap<String, TreeMap<JobSchedulingInfo, JobInProgress>> jobsPerQueue;
+    private Map<String, Map<JobSchedulingInfo, JobInProgress>> jobsPerQueue;
 
     public JobInitializationThread() {
       startIniting = true;
-      jobsPerQueue = new HashMap<String, TreeMap<JobSchedulingInfo, JobInProgress>>();
+      jobsPerQueue = 
+        new ConcurrentHashMap<String, Map<JobSchedulingInfo, JobInProgress>>();
     }
 
     @Override
@@ -156,8 +151,7 @@ public class JobInitializationPoller ext
      * @return First job in the queue and removes it.
      */
     private JobInProgress getFirstJobInQueue(String queue) {
-      TreeMap<JobSchedulingInfo, JobInProgress> jobsList = jobsPerQueue
-          .get(queue);
+      Map<JobSchedulingInfo, JobInProgress> jobsList = jobsPerQueue.get(queue);
       synchronized (jobsList) {
         if (jobsList.isEmpty()) {
           return null;
@@ -186,8 +180,7 @@ public class JobInitializationPoller ext
     }
 
     void addJobsToQueue(String queue, JobInProgress job) {
-      TreeMap<JobSchedulingInfo, JobInProgress> jobs = jobsPerQueue
-          .get(queue);
+      Map<JobSchedulingInfo, JobInProgress> jobs = jobsPerQueue.get(queue);
       if (jobs == null) {
         LOG.error("Invalid queue passed to the thread : " + queue
             + " For job :: " + job.getJobID());
@@ -199,43 +192,20 @@ public class JobInitializationPoller ext
       }
     }
 
-    void addQueue(String queue) {
-      TreeMap<JobSchedulingInfo, JobInProgress> jobs = new TreeMap<JobSchedulingInfo, JobInProgress>(
-          jobQueueManager.getComparator(queue));
-      jobsPerQueue.put(queue, jobs);
-    }
-  }
-
-  /**
-   * The queue information class maintains following information per queue:
-   * Maximum users allowed to initialize job in the particular queue. Maximum
-   * jobs allowed to be initialize per user in the queue.
-   * 
-   */
-  private class QueueInfo {
-    String queue;
-    int maxUsersAllowedToInitialize;
-    int maxJobsPerUserToInitialize;
+    void addQueue(String queueName) {
+      CapacitySchedulerQueue queue = jobQueueManager.getQueue(queueName);
 
-    public QueueInfo(String queue, int maxUsersAllowedToInitialize,
-        int maxJobsPerUserToInitialize) {
-      this.queue = queue;
-      this.maxJobsPerUserToInitialize = maxJobsPerUserToInitialize;
-      this.maxUsersAllowedToInitialize = maxUsersAllowedToInitialize;
+      TreeMap<JobSchedulingInfo, JobInProgress> jobs = 
+        new TreeMap<JobSchedulingInfo, JobInProgress>(queue.getComparator());
+      jobsPerQueue.put(queueName, jobs);
     }
   }
 
   /**
-   * Map which contains the configuration used for initializing jobs
-   * in that associated to a particular job queue.
-   */
-  private HashMap<String, QueueInfo> jobQueues;
-
-  /**
    * Set of jobs which have been passed to Initialization threads.
    * This is maintained so that we dont call initTasks() for same job twice.
    */
-  private HashMap<JobID,JobInProgress> initializedJobs;
+  private HashMap<JobID, JobInProgress> initializedJobs;
 
   private volatile boolean running;
 
@@ -244,41 +214,34 @@ public class JobInitializationPoller ext
    * The map which provides information which thread should be used to
    * initialize jobs for a given job queue.
    */
-  private HashMap<String, JobInitializationThread> threadsToQueueMap;
+  private Map<String, JobInitializationThread> threadsToQueueMap;
 
   public JobInitializationPoller(JobQueuesManager mgr,
       CapacitySchedulerConf rmConf, Set<String> queue, 
       TaskTrackerManager ttm) {
     initializedJobs = new HashMap<JobID,JobInProgress>();
-    jobQueues = new HashMap<String, QueueInfo>();
     this.jobQueueManager = mgr;
-    threadsToQueueMap = new HashMap<String, JobInitializationThread>();
+    threadsToQueueMap = 
+      Collections.synchronizedMap(new HashMap<String, 
+          JobInitializationThread>());
     super.setName("JobInitializationPollerThread");
     running = true;
     this.ttm = ttm;
   }
 
+  void setTaskTrackerManager(TaskTrackerManager ttm) {
+    this.ttm = ttm;
+  }
+  
   /*
    * method to read all configuration values required by the initialisation
    * poller
    */
 
-  void init(Set<String> queues, 
+  void init(int numQueues, 
             CapacitySchedulerConf capacityConf) {
-    for (String queue : queues) {
-      int userlimit = capacityConf.getMinimumUserLimitPercent(queue);
-      int maxUsersToInitialize = ((100 / userlimit) + MAX_ADDITIONAL_USERS_TO_INIT);
-      int maxJobsPerUserToInitialize = capacityConf
-          .getMaxJobsPerUserToInitialize(queue);
-      QueueInfo qi = new QueueInfo(queue, maxUsersToInitialize,
-          maxJobsPerUserToInitialize);
-      jobQueues.put(queue, qi);
-    }
     sleepInterval = capacityConf.getSleepInterval();
-    poolSize = capacityConf.getMaxWorkerThreads();
-    if (poolSize > queues.size()) {
-      poolSize = queues.size();
-    }
+    poolSize = Math.min(capacityConf.getMaxWorkerThreads(), numQueues);
     assignThreadsToQueues();
     Collection<JobInitializationThread> threads = threadsToQueueMap.values();
     for (JobInitializationThread t : threads) {
@@ -289,6 +252,20 @@ public class JobInitializationPoller ext
     }
   }
 
+  void reinit(Set<String> queues) {
+    Set<String> oldQueues = threadsToQueueMap.keySet();
+    int i=0;
+    JobInitializationThread[] threads = 
+      threadsToQueueMap.values().toArray(new JobInitializationThread[0]);
+    for (String newQueue : queues) {
+      if (!oldQueues.contains(newQueue)) {
+        JobInitializationThread t = threads[i++ % threads.length];
+        t.addQueue(newQueue);
+        threadsToQueueMap.put(newQueue, t);
+      }
+    }
+  }
+  
   /**
    * This is main thread of initialization poller, We essentially do 
    * following in the main threads:
@@ -323,7 +300,7 @@ public class JobInitializationPoller ext
    * 
    */
   void selectJobsToInitialize() {
-    for (String queue : jobQueues.keySet()) {
+    for (String queue : jobQueueManager.getAllQueues()) {
       ArrayList<JobInProgress> jobsToInitialize = getJobsToInitialize(queue);
       printJobs(jobsToInitialize);
       JobInitializationThread t = threadsToQueueMap.get(queue);
@@ -368,8 +345,9 @@ public class JobInitializationPoller ext
    * 
    */
   private void assignThreadsToQueues() {
-    int countOfQueues = jobQueues.size();
-    String[] queues = (String[]) jobQueues.keySet().toArray(
+    Collection<String> queueNames = jobQueueManager.getAllQueues();
+    int countOfQueues = queueNames.size();
+    String[] queues = (String[]) queueNames.toArray(
         new String[countOfQueues]);
     int numberOfQueuesPerThread = countOfQueues / poolSize;
     int numberOfQueuesAssigned = 0;
@@ -425,22 +403,21 @@ public class JobInitializationPoller ext
    * already been initialized. The latter user's initialized jobs are redundant,
    * but we'll leave them initialized.
    * 
-   * @param queue name of the queue to pick the jobs to initialize.
+   * @param queueName name of the queue to pick the jobs to initialize.
    * @return list of jobs to be initalized in a queue. An empty queue is
    *         returned if no jobs are found.
    */
-  ArrayList<JobInProgress> getJobsToInitialize(String queue) {
-    QueueInfo qi = jobQueues.get(queue);
+  ArrayList<JobInProgress> getJobsToInitialize(String queueName) {
+    CapacitySchedulerQueue queue = jobQueueManager.getQueue(queueName);
     ArrayList<JobInProgress> jobsToInitialize = new ArrayList<JobInProgress>();
-    // use the configuration parameter which is configured for the particular
-    // queue.
-    int maximumUsersAllowedToInitialize = qi.maxUsersAllowedToInitialize;
-    int maxJobsPerUserAllowedToInitialize = qi.maxJobsPerUserToInitialize;
-    int maxJobsPerQueueToInitialize = maximumUsersAllowedToInitialize
-        * maxJobsPerUserAllowedToInitialize;
+
     int countOfJobsInitialized = 0;
-    HashMap<String, Integer> userJobsInitialized = new HashMap<String, Integer>();
-    Collection<JobInProgress> jobs = jobQueueManager.getWaitingJobs(queue);
+    int countOfTasksInitialized = 0;
+    Map<String, Integer> userJobsInitialized = new HashMap<String, Integer>();
+    Map<String, Integer> userTasksInitialized = new HashMap<String, Integer>();
+    Set<String> usersOverLimit = new HashSet<String>();
+    Collection<JobInProgress> jobs = queue.getWaitingJobs();
+    
     /*
      * Walk through the collection of waiting jobs.
      *  We maintain a map of jobs that have already been initialized. If a 
@@ -456,40 +433,54 @@ public class JobInitializationPoller ext
      */
     for (JobInProgress job : jobs) {
       String user = job.getProfile().getUser();
-      int numberOfJobs = userJobsInitialized.get(user) == null ? 0
-          : userJobsInitialized.get(user);
-      // If the job is already initialized then add the count against user
-      // then continue.
+      // If the job is already initialized then continue.
       if (initializedJobs.containsKey(job.getJobID())) {
-        userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
-        countOfJobsInitialized++;
         continue;
       }
-      boolean isUserPresent = userJobsInitialized.containsKey(user);
-      if (!isUserPresent
-          && userJobsInitialized.size() < maximumUsersAllowedToInitialize) {
-        // this is a new user being considered and the number of users
-        // is within limits.
-        userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
-        jobsToInitialize.add(job);
-        initializedJobs.put(job.getJobID(),job);
-        countOfJobsInitialized++;
-      } else if (isUserPresent
-          && numberOfJobs < maxJobsPerUserAllowedToInitialize) {
-        userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
-        jobsToInitialize.add(job);
-        initializedJobs.put(job.getJobID(),job);
-        countOfJobsInitialized++;
-      }
-      /*
-       * if the maximum number of jobs to initalize for a queue is reached
-       * then we stop looking at further jobs. The jobs beyond this number
-       * can be initialized.
+
+      /** 
+       * Ensure we will not exceed queue limits
        */
-      if(countOfJobsInitialized > maxJobsPerQueueToInitialize) {
+      if (!queue.initializeJobForQueue(job, 
+          countOfJobsInitialized, countOfTasksInitialized)) {
         break;
       }
+      
+      
+      /**
+       *  Ensure we will not exceed user limits
+       */
+      
+      // Ensure we don't process a user's jobs out of order 
+      if (usersOverLimit.contains(user)) {
+        continue;
+      }
+      
+      Integer userJobs = userJobsInitialized.get(user);
+      if (userJobs == null) {
+        userJobs = 0;
+      }
+      Integer userTasks = userTasksInitialized.get(user);
+      if (userTasks == null) {
+        userTasks = 0;
+      }
+      if (!queue.initializeJobForUser(job, user, userJobs, userTasks)) {
+        usersOverLimit.add(user);   // Note down the user
+        continue;
+      }
+      
+      // Ready to initialize!
+      initializedJobs.put(job.getJobID(), job);
+      jobsToInitialize.add(job);
+      
+      // Update queue & user counts
+      countOfJobsInitialized++;
+      countOfTasksInitialized += job.desiredTasks();
+      
+      userJobsInitialized.put(user, userJobs+1);
+      userTasksInitialized.put(user, (userTasks + job.desiredTasks()));
     }
+    
     return jobsToInitialize;
   }
 
@@ -536,7 +527,9 @@ public class JobInitializationPoller ext
           LOG.info("Removing scheduled jobs from waiting queue"
               + job.getJobID());
           jobsIterator.remove();
-          jobQueueManager.removeJobFromWaitingQueue(job);
+          CapacitySchedulerQueue queue = 
+            jobQueueManager.getQueue(job.getProfile().getQueueName());
+          queue.removeWaitingJob(new JobSchedulingInfo(job));
           continue;
         }
       }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=1077573&r1=1077572&r2=1077573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Fri Mar  4 04:30:34 2011
@@ -18,6 +18,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -25,6 +26,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,160 +38,32 @@ import org.apache.hadoop.mapred.JobStatu
  * one or more queues. 
  */
 class JobQueuesManager extends JobInProgressListener {
-
-  /* 
-   * If a queue supports priorities, jobs must be 
-   * sorted on priorities, and then on their start times (technically, 
-   * their insertion time.  
-   * If a queue doesn't support priorities, jobs are
-   * sorted based on their start time.  
-   */
-  
-  // comparator for jobs in queues that don't support priorities
-  private static final Comparator<JobSchedulingInfo> STARTTIME_JOB_COMPARATOR
-    = new Comparator<JobSchedulingInfo>() {
-    public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) {
-      // the job that started earlier wins
-      if (o1.getStartTime() < o2.getStartTime()) {
-        return -1;
-      } else {
-        return (o1.getStartTime() == o2.getStartTime() 
-                ? o1.getJobID().compareTo(o2.getJobID()) 
-                : 1);
-      }
-    }
-  };
   
-  // class to store queue info
-  private static class QueueInfo {
-
-    // whether the queue supports priorities
-    boolean supportsPriorities;
-    Map<JobSchedulingInfo, JobInProgress> waitingJobs; // for waiting jobs
-    Map<JobSchedulingInfo, JobInProgress> runningJobs; // for running jobs
-    
-    public Comparator<JobSchedulingInfo> comparator;
-    
-    QueueInfo(boolean prio) {
-      this.supportsPriorities = prio;
-      if (supportsPriorities) {
-        // use the default priority-aware comparator
-        comparator = JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR;
-      }
-      else {
-        comparator = STARTTIME_JOB_COMPARATOR;
-      }
-      waitingJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
-      runningJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
-    }
-    
-    Collection<JobInProgress> getWaitingJobs() {
-      synchronized (waitingJobs) {
-        return Collections.unmodifiableCollection(
-            new LinkedList<JobInProgress>(waitingJobs.values()));
-      }
-    }
-    
-    int getNumWaitingJobs() {
-      synchronized (waitingJobs) {
-        return waitingJobs.size();
-      }
-    }
-    
-    Collection<JobInProgress> getRunningJobs() {
-      synchronized (runningJobs) {
-       return Collections.unmodifiableCollection(
-           new LinkedList<JobInProgress>(runningJobs.values())); 
-      }
-    }
-    
-    int getNumRunningJobs() {
-      synchronized (runningJobs) {
-        return runningJobs.size();
-      }
-    }
-    
-    void addRunningJob(JobInProgress job) {
-      synchronized (runningJobs) {
-       runningJobs.put(new JobSchedulingInfo(job),job); 
-      }
-    }
-    
-    JobInProgress removeRunningJob(JobSchedulingInfo jobInfo) {
-      synchronized (runningJobs) {
-        return runningJobs.remove(jobInfo); 
-      }
-    }
-    
-    JobInProgress removeWaitingJob(JobSchedulingInfo schedInfo) {
-      synchronized (waitingJobs) {
-        return waitingJobs.remove(schedInfo);
-      }
-    }
-    
-    void addWaitingJob(JobInProgress job) {
-      synchronized (waitingJobs) {
-        waitingJobs.put(new JobSchedulingInfo(job), job);
-      }
-    }
-    
-    int getWaitingJobCount() {
-      synchronized (waitingJobs) {
-       return waitingJobs.size(); 
-      }
-    }
-    
-  }
-  
-  // we maintain a hashmap of queue-names to queue info
-  private Map<String, QueueInfo> jobQueues = 
-    new HashMap<String, QueueInfo>();
   private static final Log LOG = LogFactory.getLog(JobQueuesManager.class);
   private CapacityTaskScheduler scheduler;
+  // Queues in the system
+  private Collection<String> jobQueueNames;
+  private Map<String, CapacitySchedulerQueue> jobQueues = 
+    new HashMap<String, CapacitySchedulerQueue>();
 
   
   JobQueuesManager(CapacityTaskScheduler s) {
     this.scheduler = s;
   }
   
-  /**
-   * create an empty queue with the default comparator
-   * @param queueName The name of the queue
-   * @param supportsPriotities whether the queue supports priorities
-   */
-  public void createQueue(String queueName, boolean supportsPriotities) {
-    jobQueues.put(queueName, new QueueInfo(supportsPriotities));
-  }
-  
-  /**
-   * Returns the queue of running jobs associated with the name
-   */
-  public Collection<JobInProgress> getRunningJobQueue(String queueName) {
-    return jobQueues.get(queueName).getRunningJobs();
-  }
-  
-  public int getNumRunningJobs(String queueName) {
-    return jobQueues.get(queueName).getNumRunningJobs();
-  }
-  
-  /**
-   * Returns the queue of waiting jobs associated with queue name.
-   * 
-   */
-  Collection<JobInProgress> getWaitingJobs(String queueName) {
-    return jobQueues.get(queueName).getWaitingJobs();
-  }
-  
-  public int getNumWaitingJobs(String queueName) {
-    return jobQueues.get(queueName).getNumWaitingJobs();
+  void setQueues(Map<String, CapacitySchedulerQueue> queues) {
+    this.jobQueues = queues;
+    this.jobQueueNames = new ArrayList<String>(queues.keySet());
   }
   
   @Override
   public void jobAdded(JobInProgress job) throws IOException {
-    LOG.info("Job submitted to queue " + job.getProfile().getQueueName());
+    LOG.info("Job " + job.getJobID() + " submitted to queue " + 
+        job.getProfile().getQueueName());
+    
     // add job to the right queue
-    QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
-    if (null == qi) {
+    CapacitySchedulerQueue queue = getQueue(job.getProfile().getQueueName());
+    if (null == queue) {
       // job was submitted to a queue we're not aware of
       LOG.warn("Invalid queue " + job.getProfile().getQueueName() + 
           " specified for job" + job.getProfile().getJobID() + 
@@ -198,7 +72,7 @@ class JobQueuesManager extends JobInProg
     }
     // add job to waiting queue. It will end up in the right place, 
     // based on priority. 
-    qi.addWaitingJob(job);
+    queue.addWaitingJob(job);
     // let scheduler know. 
     scheduler.jobAdded(job);
   }
@@ -208,15 +82,18 @@ class JobQueuesManager extends JobInProg
    * job queue manager.
    */
   private void jobCompleted(JobInProgress job, JobSchedulingInfo oldInfo, 
-                            QueueInfo qi) {
+      CapacitySchedulerQueue queue) {
     LOG.info("Job " + job.getJobID().toString() + " submitted to queue " 
         + job.getProfile().getQueueName() + " has completed");
     //remove jobs from both queue's a job can be in
     //running and waiting queue at the same time.
-    qi.removeRunningJob(oldInfo);
-    qi.removeWaitingJob(oldInfo);
-    // let scheduler know
-    scheduler.jobCompleted(job);
+    JobInProgress runningJob = queue.removeRunningJob(oldInfo);
+    JobInProgress waitingJob = queue.removeWaitingJob(oldInfo);
+    // let scheduler know if necessary
+    // sometimes this isn't necessary if the job was rejected during submission
+    if (runningJob != null || waitingJob != null) {
+      scheduler.jobCompleted(job);
+    }
   }
   
   // Note that job is removed when the job completes i.e in jobUpated()
@@ -226,27 +103,34 @@ class JobQueuesManager extends JobInProg
   // This is used to reposition a job in the queue. A job can get repositioned 
   // because of the change in the job priority or job start-time.
   private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo, 
-                           QueueInfo qi) {
+      CapacitySchedulerQueue queue) {
     
-    if(qi.removeWaitingJob(oldInfo) != null) {
-      qi.addWaitingJob(job);
+    if(queue.removeWaitingJob(oldInfo) != null) {
+      try {
+        queue.addWaitingJob(job);
+      } catch (IOException ioe) {
+        // Ignore, cannot happen
+        LOG.warn("Couldn't change priority!");
+        return;
+      }
     }
-    if(qi.removeRunningJob(oldInfo) != null) {
-      qi.addRunningJob(job);
+    if(queue.removeRunningJob(oldInfo) != null) {
+      queue.addRunningJob(job);
     }
   }
   
   // This is used to move a job from the waiting queue to the running queue.
   private void makeJobRunning(JobInProgress job, JobSchedulingInfo oldInfo, 
-                              QueueInfo qi) {
+                              CapacitySchedulerQueue queue) {
     // Removing of the job from job list is responsibility of the
     //initialization poller.
     // Add the job to the running queue
-    qi.addRunningJob(job);
+    queue.addRunningJob(job);
   }
   
   // Update the scheduler as job's state has changed
-  private void jobStateChanged(JobStatusChangeEvent event, QueueInfo qi) {
+  private void jobStateChanged(JobStatusChangeEvent event, 
+                               CapacitySchedulerQueue queue) {
     JobInProgress job = event.getJobInProgress();
     JobSchedulingInfo oldJobStateInfo = 
       new JobSchedulingInfo(event.getOldStatus());
@@ -255,16 +139,16 @@ class JobQueuesManager extends JobInProg
     if (event.getEventType() == EventType.PRIORITY_CHANGED 
         || event.getEventType() == EventType.START_TIME_CHANGED) {
       // Make a priority change
-      reorderJobs(job, oldJobStateInfo, qi);
+      reorderJobs(job, oldJobStateInfo, queue);
     } else if (event.getEventType() == EventType.RUN_STATE_CHANGED) {
       // Check if the job is complete
       int runState = job.getStatus().getRunState();
       if (runState == JobStatus.SUCCEEDED
           || runState == JobStatus.FAILED
           || runState == JobStatus.KILLED) {
-        jobCompleted(job, oldJobStateInfo, qi);
+        jobCompleted(job, oldJobStateInfo, queue);
       } else if (runState == JobStatus.RUNNING) {
-        makeJobRunning(job, oldJobStateInfo, qi);
+        makeJobRunning(job, oldJobStateInfo, queue);
       }
     }
   }
@@ -272,8 +156,8 @@ class JobQueuesManager extends JobInProg
   @Override
   public void jobUpdated(JobChangeEvent event) {
     JobInProgress job = event.getJobInProgress();
-    QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
-    if (null == qi) {
+    CapacitySchedulerQueue queue = getQueue(job.getProfile().getQueueName());
+    if (null == queue) {
       // can't find queue for job. Shouldn't happen. 
       LOG.warn("Could not find queue " + job.getProfile().getQueueName() + 
           " when updating job " + job.getProfile().getJobID());
@@ -282,26 +166,15 @@ class JobQueuesManager extends JobInProg
     
     // Check if this is the status change
     if (event instanceof JobStatusChangeEvent) {
-      jobStateChanged((JobStatusChangeEvent)event, qi);
+      jobStateChanged((JobStatusChangeEvent)event, queue);
     }
   }
   
-  void removeJobFromWaitingQueue(JobInProgress job) {
-    String queue = job.getProfile().getQueueName();
-    QueueInfo qi = jobQueues.get(queue);
-    qi.removeWaitingJob(new JobSchedulingInfo(job));
+  CapacitySchedulerQueue getQueue(String queue) {
+    return jobQueues.get(queue);
   }
   
-  Comparator<JobSchedulingInfo> getComparator(String queue) {
-    return jobQueues.get(queue).comparator;
-  }
-  
-  int getWaitingJobCount(String queue) {
-    QueueInfo qi = jobQueues.get(queue);
-    return qi.getWaitingJobCount();
-  }
-
-  boolean doesQueueSupportPriorities(String queueName) {
-    return jobQueues.get(queueName).supportsPriorities;
+  Collection<String> getAllQueues() {
+    return Collections.unmodifiableCollection(jobQueueNames);
   }
 }



Mime
View raw message