hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077573 [1/3] - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapred/
Date Fri, 04 Mar 2011 04:30:34 GMT
Author: omalley
Date: Fri Mar  4 04:30:34 2011
New Revision: 1077573

URL: http://svn.apache.org/viewvc?rev=1077573&view=rev
Log:
commit 2845b2fe4208c72af24a511e2bcd98b82e195c58
Author: Arun C Murthy <acmurthy@apache.org>
Date:   Tue Jul 20 23:08:02 2010 -0700

    MAPREDUCE-1872. Hardened CapacityScheduler to have comprehensive, coherent limits on tasks/jobs for jobs/users/queues. Also, added the ability to refresh queue definitions without the need to restart the JobTracker.
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1872. Hardened CapacityScheduler to have comprehensive, coherent
    +    limits on tasks/jobs for jobs/users/queues. Also, added the ability to
    +    refresh queue definitions without the need to restart the JobTracker.
    +    (acmurthy)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerServlet.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/QueueManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=1077573&r1=1077572&r2=1077573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Fri Mar  4 04:30:34 2011
@@ -36,6 +36,8 @@ class CapacitySchedulerConf {
   
   private int defaultUlimitMinimum;
   
+  private float defaultUserLimitFactor;
+  
   private boolean defaultSupportPriority;
   
   private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX = 
@@ -98,7 +100,14 @@ class CapacitySchedulerConf {
 
   private Configuration rmConf;
 
-  private int defaultMaxJobsPerUsersToInitialize;
+  private int defaultInitToAcceptJobsFactor;
+  private int defaultMaxActiveTasksPerUserToInitialize;
+  private int defaultMaxActiveTasksPerQueueToInitialize;
+  
+  static final String MAX_SYSTEM_JOBS_KEY = 
+    "mapred.capacity-scheduler.maximum-system-jobs";
+  
+  static final int DEFAULT_MAX_SYSTEM_JOBS = 5000;
   
   /**
    * Create a new Capacity scheduler conf.
@@ -130,13 +139,25 @@ class CapacitySchedulerConf {
    * which is used by the Capacity Scheduler.
    */
   private void initializeDefaults() {
-    defaultUlimitMinimum = rmConf.getInt(
-        "mapred.capacity-scheduler.default-minimum-user-limit-percent", 100);
+    defaultUlimitMinimum = 
+      rmConf.getInt(
+          "mapred.capacity-scheduler.default-minimum-user-limit-percent", 100);
+    defaultUserLimitFactor = 
+      rmConf.getFloat("mapred.capacity-scheduler.default-user-limit-factor", 
+          1.0f);
     defaultSupportPriority = rmConf.getBoolean(
         "mapred.capacity-scheduler.default-supports-priority", false);
-    defaultMaxJobsPerUsersToInitialize = rmConf.getInt(
-        "mapred.capacity-scheduler.default-maximum-initialized-jobs-per-user",
-        2);
+    defaultMaxActiveTasksPerQueueToInitialize = 
+      rmConf.getInt(
+          "mapred.capacity-scheduler.default-maximum-active-tasks-per-queue", 
+          200000);
+    defaultMaxActiveTasksPerUserToInitialize = 
+      rmConf.getInt(
+          "mapred.capacity-scheduler.default-maximum-active-tasks-per-user", 
+          100000);
+    defaultInitToAcceptJobsFactor =
+      rmConf.getInt("mapred.capacity-scheduler.default-init-accept-jobs-factor", 
+          10);
   }
   
   /**
@@ -294,6 +315,32 @@ class CapacitySchedulerConf {
   }
   
   /**
+   * Get the factor of queue capacity above which a single user in a queue
+   * can consume resources.
+   * 
+   * @param queue queue name
+   * @return factor of queue capacity above which a single user in a queue
+   *         can consume resources
+   */
+  public float getUserLimitFactor(String queue) {
+    return rmConf.getFloat(toFullPropertyName(queue, "user-limit-factor"), 
+        defaultUserLimitFactor);
+  }
+  
+  /**
+   * Set the factor of queue capacity above which a single user in a queue
+   * can consume resources.
+   * 
+   * @param queue queue name
+   * @param userLimitFactor factor of queue capacity above which a single user 
+   *                        in a queue can consume resources
+   */
+  public void setUserLimitFactor(String queue, float userLimitFactor) {
+    rmConf.setFloat(toFullPropertyName(queue, "user-limit-factor"), 
+        userLimitFactor);
+  }
+  
+  /**
    * Reload configuration by clearing the information read from the 
    * underlying configuration file.
    */
@@ -307,38 +354,81 @@ class CapacitySchedulerConf {
       return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
   }
 
-  /**
-   * Gets the maximum number of jobs which are allowed to initialize in the
-   * job queue.
-   * 
-   * @param queue queue name.
-   * @return maximum number of jobs allowed to be initialized per user.
-   * @throws IllegalArgumentException if maximum number of users is negative
-   * or zero.
-   */
-  public int getMaxJobsPerUserToInitialize(String queue) {
-    int maxJobsPerUser = rmConf.getInt(toFullPropertyName(queue,
-        "maximum-initialized-jobs-per-user"), 
-        defaultMaxJobsPerUsersToInitialize);
-    if(maxJobsPerUser <= 0) {
+  public int getMaxSystemJobs() {
+    int maxSystemJobs = 
+      rmConf.getInt(MAX_SYSTEM_JOBS_KEY, DEFAULT_MAX_SYSTEM_JOBS);
+    if (maxSystemJobs <= 0) {
+      throw new IllegalArgumentException("Invalid maximum system jobs: " + 
+          maxSystemJobs);
+    }
+    
+    return maxSystemJobs;
+  }
+
+  public void setMaxSystemJobs(int maxSystemJobs) {
+    rmConf.setInt(MAX_SYSTEM_JOBS_KEY, maxSystemJobs);
+  }
+  
+  public int getInitToAcceptJobsFactor(String queue) {
+    int initToAccepFactor = 
+      rmConf.getInt(toFullPropertyName(queue, "init-accept-jobs-factor"), 
+          defaultInitToAcceptJobsFactor);
+    if(initToAccepFactor <= 0) {
       throw new IllegalArgumentException(
-          "Invalid maximum jobs per user configuration " + maxJobsPerUser);
+          "Invalid maximum jobs per user configuration " + initToAccepFactor);
     }
-    return maxJobsPerUser;
+    return initToAccepFactor;
+  }
+  
+  public void setInitToAcceptJobsFactor(String queue, int initToAcceptFactor) {
+    rmConf.setInt(toFullPropertyName(queue, "init-accept-jobs-factor"), 
+        initToAcceptFactor);
   }
   
   /**
-   * Sets the maximum number of jobs which are allowed to be initialized 
-   * for a user in the queue.
+   * Get the maximum active tasks per user to be initialized.
    * 
-   * @param queue queue name.
-   * @param value maximum number of jobs allowed to be initialized per user.
+   * @param queue queue name
    */
-  public void setMaxJobsPerUserToInitialize(String queue, int value) {
-    rmConf.setInt(toFullPropertyName(queue, 
-        "maximum-initialized-jobs-per-user"), value);
+  public int getMaxInitializedActiveTasks(String queue) {
+    return rmConf.getInt(toFullPropertyName(queue, 
+                                            "maximum-initialized-active-tasks"), 
+                         defaultMaxActiveTasksPerQueueToInitialize);
   }
-
+  
+  /**
+   * Set the maximum active tasks per user to be initialized.
+   * 
+   * @param queue queue name
+   * @param value maximum active tasks
+   */
+  public void setMaxInitializedActiveTasks(String queue, int value) {
+    rmConf.setInt(toFullPropertyName(queue, "maximum-initialized-active-tasks"), 
+                  value);
+  }
+  
+  /**
+   * Get the maximum active tasks per user to be initialized.
+   * 
+   * @param queue queue name
+   */
+  public int getMaxInitializedActiveTasksPerUser(String queue) {
+    return rmConf.getInt(toFullPropertyName(queue, 
+                                            "maximum-initialized-active-tasks-per-user"), 
+                         defaultMaxActiveTasksPerUserToInitialize);
+  }
+  
+  /**
+   * Set the maximum active tasks per user to be initialized.
+   * 
+   * @param queue queue name
+   * @param value maximum active tasks
+   */
+  public void setMaxInitializedActiveTasksPerUser(String queue, int value) {
+    rmConf.setInt(toFullPropertyName(queue, "maximum-initialized-active-tasks-per-user"), 
+                  value);
+  }
+  
   /**
    * Amount of time in milliseconds which poller thread and initialization
    * thread would sleep before looking at the queued jobs.

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java?rev=1077573&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java Fri Mar  4 04:30:34 2011
@@ -0,0 +1,1061 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapred.CapacityTaskScheduler.TaskSchedulingMgr;
+import org.apache.hadoop.mapred.JobQueueJobInProgressListener.JobSchedulingInfo;
+
+
+/***********************************************************************
+ * Keeping track of scheduling information for queues
+ * 
+ * We need to maintain scheduling information relevant to a queue (its 
+ * name, capacity, etc), along with information specific to 
+ * each kind of task, Map or Reduce (num of running tasks, pending 
+ * tasks etc). 
+ * 
+ * This scheduling information is used to decide how to allocate
+ * tasks, redistribute capacity, etc.
+ *  
+ * A QueueSchedulingInfo(QSI) object represents scheduling information for
+ * a  A TaskSchedulingInfo (TSI) object represents scheduling 
+ * information for a particular kind of task (Map or Reduce).
+ *   
+ **********************************************************************/
+class CapacitySchedulerQueue {
+  
+  static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
+  
+  private static class SlotsUsage {
+    /** 
+     * the actual capacity, which depends on how many slots are available
+     * in the cluster at any given time. 
+     */
+    private int capacity = 0;
+    // number of running tasks
+    int numRunningTasks = 0;
+    // number of slots occupied by running tasks
+    int numSlotsOccupied = 0;
+  
+    //the actual maximum capacity which depends on how many slots are available
+    //in cluster at any given time.
+    private int maxCapacity = -1;
+  
+    /**
+     * for each user, we need to keep track of number of slots occupied by
+     * running tasks
+     */
+    Map<String, Integer> numSlotsOccupiedByUser = 
+      new HashMap<String, Integer>();
+  
+    /**
+     * reset the variables associated with tasks
+     */
+    void reset() {
+      numRunningTasks = 0;
+      numSlotsOccupied = 0;
+      numSlotsOccupiedByUser.clear();
+    }
+  
+  
+    /**
+     * Returns the actual capacity.
+     * capacity.
+     *
+     * @return
+     */
+    int getCapacity() {
+      return capacity;
+    }
+  
+    /**
+     * Mutator method for capacity
+     *
+     * @param capacity
+     */
+    void setCapacity(int capacity) {
+        this.capacity = capacity;
+    }
+  
+    /**
+     * @return the numRunningTasks
+     */
+    int getNumRunningTasks() {
+      return numRunningTasks;
+    }
+  
+    /**
+     * @return the numSlotsOccupied
+     */
+    int getNumSlotsOccupied() {
+      return numSlotsOccupied;
+    }
+  
+    /**
+     * return information about the tasks
+     */
+    @Override
+    public String toString() {
+      float occupiedSlotsAsPercent =
+          getCapacity() != 0 ?
+            ((float) numSlotsOccupied * 100 / getCapacity()) : 0;
+      StringBuffer sb = new StringBuffer();
+      
+      sb.append("Capacity: " + capacity + " slots\n");
+      
+      if(getMaxCapacity() >= 0) {
+        sb.append("Maximum capacity: " + getMaxCapacity() +" slots\n");
+      }
+      sb.append(String.format("Used capacity: %d (%.1f%% of Capacity)\n",
+          Integer.valueOf(numSlotsOccupied), Float
+              .valueOf(occupiedSlotsAsPercent)));
+      sb.append(String.format("Running tasks: %d\n", Integer
+          .valueOf(numRunningTasks)));
+      // include info on active users
+      if (numSlotsOccupied != 0) {
+        sb.append("Active users:\n");
+        for (Map.Entry<String, Integer> entry : numSlotsOccupiedByUser
+            .entrySet()) {
+          if ((entry.getValue() == null) || (entry.getValue().intValue() <= 0)) {
+            // user has no tasks running
+            continue;
+          }
+          sb.append("User '" + entry.getKey() + "': ");
+          int numSlotsOccupiedByThisUser = entry.getValue().intValue();
+          float p =
+              (float) numSlotsOccupiedByThisUser * 100 / numSlotsOccupied;
+          sb.append(String.format("%d (%.1f%% of used capacity)\n", Long
+              .valueOf(numSlotsOccupiedByThisUser), Float.valueOf(p)));
+        }
+      }
+      return sb.toString();
+    }
+  
+    int getMaxCapacity() {
+      return maxCapacity;
+    }
+  
+    void setMaxCapacity(int maxCapacity) {
+      this.maxCapacity = maxCapacity;
+    }
+    
+    int getNumSlotsOccupiedByUser(String user) {
+      Integer slots = numSlotsOccupiedByUser.get(user);
+      return (slots != null) ? slots : 0;
+    }
+
+
+    void updateCapacities(float capacityPercent, float maxCapacityPercent, 
+                          int clusterCapacity) {
+      //compute new capacity
+      setCapacity((int)(capacityPercent*clusterCapacity/100));
+
+      //compute new max map capacities
+      if(maxCapacityPercent > 0) {
+        setMaxCapacity((int)(maxCapacityPercent*clusterCapacity / 100));
+      }
+    }
+    
+    void updateSlotsUsage(String user, int numRunningTasks, int numSlotsOccupied) {
+      this.numRunningTasks += numRunningTasks;
+      this.numSlotsOccupied += numSlotsOccupied;
+      Integer i = this.numSlotsOccupiedByUser.get(user);
+      int slots = numSlotsOccupied + ((i == null) ? 0 : i.intValue());
+      this.numSlotsOccupiedByUser.put(user, slots);
+    }
+  }
+
+  // Queue name
+  final String queueName;
+
+  /**
+   * capacity(%) is set in the config
+   */
+  volatile float capacityPercent = 0;
+  
+  
+  /**
+   * maxCapacityPercent(%) is set in config as
+   * mapred.capacity-scheduler.<queue-name>.maximum-capacity
+   * maximum-capacity percent defines a limit beyond which a queue
+   * cannot expand. Remember this limit is dynamic and changes w.r.t
+   * cluster size.
+   */
+  volatile float maxCapacityPercent = -1;
+  
+  /** 
+   * to handle user limits, we need to know how many users have jobs in 
+   * the 
+   */  
+  Map<String, Integer> numJobsByUser = new HashMap<String, Integer>();
+    
+  /**
+   * min value of user limit (same for all users)
+   */
+  volatile int ulMin;
+
+  /**
+   * The factor of queue-capacity above which a single user can consume
+   * queue resources.
+   */
+  volatile float ulMinFactor;
+  
+  /**
+   * We keep a TaskSchedulingInfo object for each kind of task we support
+   */
+  CapacitySchedulerQueue.SlotsUsage mapSlots;
+  CapacitySchedulerQueue.SlotsUsage reduceSlots;
+  
+  /** 
+   * Whether the queue supports priorities.
+   */
+  final boolean supportsPriorities;
+  
+  /**
+   * Information required to track job, user, queue limits 
+   */
+  
+  Map<JobSchedulingInfo, JobInProgress> waitingJobs; // for waiting jobs
+  Map<JobSchedulingInfo, JobInProgress> runningJobs; // for running jobs
+  
+  /**
+   *  Active tasks in the queue
+   */
+  int activeTasks = 0;
+  
+  /**
+   *  Users in the queue
+   */
+  Map<String, UserInfo> users = new HashMap<String, UserInfo>();
+
+  /**
+   * Comparator for ordering jobs in this queue
+   */
+  public Comparator<JobSchedulingInfo> comparator;
+  
+  int maxJobsToInit;
+  int maxJobsToAccept;
+  int maxJobsPerUserToInit;
+  int maxJobsPerUserToAccept;
+  int maxActiveTasks;
+  int maxActiveTasksPerUser;
+
+  // comparator for jobs in queues that don't support priorities
+  private static final Comparator<JobSchedulingInfo> STARTTIME_JOB_COMPARATOR
+    = new Comparator<JobSchedulingInfo>() {
+    public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) {
+      // the job that started earlier wins
+      if (o1.getStartTime() < o2.getStartTime()) {
+        return -1;
+      } else {
+        return (o1.getStartTime() == o2.getStartTime() 
+                ? o1.getJobID().compareTo(o2.getJobID()) 
+                : 1);
+      }
+    }
+  };
+
+  public CapacitySchedulerQueue(String queueName, CapacitySchedulerConf conf) {
+    this.queueName = queueName;
+
+    // Do not allow changes to 'supportsPriorities'
+    supportsPriorities = conf.isPrioritySupported(queueName);
+
+    initializeQueue(conf);
+
+    if (supportsPriorities) {
+      // use the default priority-aware comparator
+      comparator = JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR;
+    }
+    else {
+      comparator = STARTTIME_JOB_COMPARATOR;
+    }
+    this.waitingJobs = 
+      new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
+    this.runningJobs = 
+      new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
+
+    this.mapSlots = new SlotsUsage();
+    this.reduceSlots = new SlotsUsage();    
+  }
+  
+  synchronized void init(float capacityPercent, float maxCapacityPercent,
+      int ulMin, float ulMinFactor,
+      int maxJobsToInit, int maxJobsPerUserToInit,
+      int maxActiveTasks, int maxActiveTasksPerUser,
+      int maxJobsToAccept, int maxJobsPerUserToAccept) {
+    this.capacityPercent = capacityPercent;
+    this.maxCapacityPercent = maxCapacityPercent;
+    this.ulMin = ulMin;
+    this.ulMinFactor = ulMinFactor;
+    
+    this.maxJobsToInit = maxJobsToInit;
+    this.maxJobsPerUserToInit = maxJobsPerUserToInit; 
+    this.maxActiveTasks = maxActiveTasks;
+    this.maxActiveTasksPerUser = maxActiveTasksPerUser; 
+    this.maxJobsToAccept = maxJobsToAccept;
+    this.maxJobsPerUserToAccept = maxJobsPerUserToAccept;
+    
+    LOG.info("Initialized '" + queueName + "' queue with " +
+        "cap=" + capacityPercent + ", " +
+        "maxCap=" + maxCapacityPercent + ", " +
+        "ulMin=" + ulMin + ", " +
+        "ulMinFactor=" + ulMinFactor + ", " +
+        "supportsPriorities=" + supportsPriorities + ", " +
+        "maxJobsToInit=" + maxJobsToInit + ", " +
+        "maxJobsToAccept=" + maxJobsToAccept + ", " +
+        "maxActiveTasks=" + maxActiveTasks + ", " +
+        "maxJobsPerUserToInit=" + maxJobsPerUserToInit + ", " +
+        "maxJobsPerUserToAccept=" + maxJobsPerUserToAccept + ", " +
+        "maxActiveTasksPerUser=" + maxActiveTasksPerUser
+    );
+  }
+  
+  synchronized void initializeQueue(CapacitySchedulerQueue other) {
+    init(other.capacityPercent, other.maxCapacityPercent, 
+        other.ulMin, other.ulMinFactor, 
+        other.maxJobsToInit, other.maxJobsPerUserToInit, 
+        other.maxActiveTasks, other.maxActiveTasksPerUser, 
+        other.maxJobsToAccept, other.maxJobsPerUserToAccept);
+  }
+  
+  synchronized void initializeQueue(CapacitySchedulerConf conf) {
+    float capacityPercent = conf.getCapacity(queueName);
+    float maxCapacityPercent = conf.getMaxCapacity(queueName);
+    int ulMin = conf.getMinimumUserLimitPercent(queueName);
+    float ulMinFactor = conf.getUserLimitFactor(queueName);
+    
+    int maxSystemJobs = conf.getMaxSystemJobs();
+    int maxJobsToInit = (int)Math.ceil(maxSystemJobs * capacityPercent/100.0);
+    int maxJobsPerUserToInit = 
+      (int)Math.ceil(maxSystemJobs * capacityPercent/100.0 * ulMin/100.0);
+    int maxActiveTasks = conf.getMaxInitializedActiveTasks(queueName);
+    int maxActiveTasksPerUser = 
+      conf.getMaxInitializedActiveTasksPerUser(queueName);
+
+    int jobInitToAcceptFactor = conf.getInitToAcceptJobsFactor(queueName);
+    int maxJobsToAccept = maxJobsToInit * jobInitToAcceptFactor;
+    int maxJobsPerUserToAccept = maxJobsPerUserToInit * jobInitToAcceptFactor;
+    
+    init(capacityPercent, maxCapacityPercent, 
+        ulMin, ulMinFactor, 
+        maxJobsToInit, maxJobsPerUserToInit, 
+        maxActiveTasks, maxActiveTasksPerUser, 
+        maxJobsToAccept, maxJobsPerUserToAccept);
+  }
+
+  /**
+   * @return the queueName
+   */
+  String getQueueName() {
+    return queueName;
+  }
+
+  /**
+   * @return the capacityPercent
+   */
+  float getCapacityPercent() {
+    return capacityPercent;
+  }
+
+  /**
+   * reset the variables associated with tasks
+   */
+  void resetSlotsUsage(TaskType taskType) {
+    if (taskType == TaskType.MAP) {
+      mapSlots.reset();
+    } else if (taskType == TaskType.REDUCE) {
+      reduceSlots.reset();
+    } else {    
+      throw new IllegalArgumentException("Illegal taskType=" + taskType);
+    }
+  }
+
+
+  /**
+   * Returns the actual capacity in terms of slots for the <code>taskType</code>.
+   * @param taskType
+   * @return actual capacity in terms of slots for the <code>taskType</code>
+   */
+  int getCapacity(TaskType taskType) {
+    if (taskType == TaskType.MAP) {
+      return mapSlots.getCapacity();
+    } else if (taskType == TaskType.REDUCE) {
+      return reduceSlots.getCapacity();
+    }
+
+    throw new IllegalArgumentException("Illegal taskType=" + taskType);
+  }
+
+  /**
+   * Get the number of running tasks of the given <code>taskType</code>.
+   * @param taskType
+   * @return
+   */
+  int getNumRunningTasks(TaskType taskType) {
+    if (taskType == TaskType.MAP) {
+      return mapSlots.getNumRunningTasks();
+    } else if (taskType == TaskType.REDUCE) {
+      return reduceSlots.getNumRunningTasks();
+    }
+    
+    throw new IllegalArgumentException("Illegal taskType=" + taskType);
+  }
+
+  /**
+   * Get number of slots occupied of the <code>taskType</code>.
+   * @param taskType
+   * @return number of slots occupied of the <code>taskType</code>
+   */
+  int getNumSlotsOccupied(TaskType taskType) {
+    if (taskType == TaskType.MAP) {
+      return mapSlots.getNumSlotsOccupied();
+    } else if (taskType == TaskType.REDUCE) {
+      return reduceSlots.getNumSlotsOccupied();
+    }
+    
+    throw new IllegalArgumentException("Illegal taskType=" + taskType);
+  }
+
+  /**
+   * Get maximum number of slots for the <code>taskType</code>.
+   * @param taskType
+   * @return maximum number of slots for the <code>taskType</code>
+   */
+  int getMaxCapacity(TaskType taskType) {
+    if (taskType == TaskType.MAP) {
+      return mapSlots.getMaxCapacity();
+    } else if (taskType == TaskType.REDUCE) {
+      return reduceSlots.getMaxCapacity();
+    }
+    
+    throw new IllegalArgumentException("Illegal taskType=" + taskType);
+  }
+
+  /**
+   * Get number of slots occupied by a <code>user</code> of 
+   * <code>taskType</code>.
+   * @param user
+   * @param taskType
+   * @return number of slots occupied by a <code>user</code> of 
+   *         <code>taskType</code>
+   */
+  int getNumSlotsOccupiedByUser(String user, TaskType taskType) {
+    if (taskType == TaskType.MAP) {
+      return mapSlots.getNumSlotsOccupiedByUser(user);
+    } else if (taskType == TaskType.REDUCE) {
+      return reduceSlots.getNumSlotsOccupiedByUser(user);
+    }
+    
+    throw new IllegalArgumentException("Illegal taskType=" + taskType);
+  }
+  
+  /**
+   * A new job is added to the 
+   * @param job
+   */
+  void jobAdded(JobInProgress job) {
+    // update user-specific info
+    String user = job.getProfile().getUser();
+    
+    Integer i = numJobsByUser.get(user);
+    if (null == i) {
+      i = 1;
+      // set the count for running tasks to 0
+      mapSlots.numSlotsOccupiedByUser.put(user, 0);
+      reduceSlots.numSlotsOccupiedByUser.put(user, 0);
+    }
+    else {
+      i++;
+    }
+    numJobsByUser.put(user, i);
+  }
+  
+  int getNumJobsByUser(String user) {
+    Integer numJobs = numJobsByUser.get(user);
+    return (numJobs != null) ? numJobs : 0;
+  }
+  
+  /**
+   * A job from the queue has completed.
+   * @param job
+   */
+  void jobCompleted(JobInProgress job) {
+    String user = job.getProfile().getUser();
+    // update numJobsByUser
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Job to be removed for user " + user);
+    }
+    Integer i = numJobsByUser.get(job.getProfile().getUser());
+    i--;  // i should never be null!
+    if (0 == i.intValue()) {
+      numJobsByUser.remove(user);
+      // remove job footprint from our TSIs
+      mapSlots.numSlotsOccupiedByUser.remove(user);
+      reduceSlots.numSlotsOccupiedByUser.remove(user);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No more jobs for user, number of users = " + 
+            numJobsByUser.size());
+      }
+    }
+    else {
+      numJobsByUser.put(user, i);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("User still has " + i + " jobs, number of users = "
+                + numJobsByUser.size());
+      }
+    }
+  }
+  
+  /**
+   * Update queue usage.
+   * @param type
+   * @param user
+   * @param numRunningTasks
+   * @param numSlotsOccupied
+   */
+  void update(TaskType type, String user, 
+      int numRunningTasks, int numSlotsOccupied) {
+    if (type == TaskType.MAP) {
+      mapSlots.updateSlotsUsage(user, numRunningTasks, numSlotsOccupied);
+    } else if (type == TaskType.REDUCE) {
+      reduceSlots.updateSlotsUsage(user, numRunningTasks, numSlotsOccupied);
+    }
+  }
+  
+  /**
+   * Update queue usage across all running jobs.
+   * @param mapClusterCapacity
+   * @param reduceClusterCapacity
+   * @param mapScheduler
+   * @param reduceScheduler
+   */
+  void updateAll(int mapClusterCapacity, int reduceClusterCapacity, 
+      TaskSchedulingMgr mapScheduler, TaskSchedulingMgr reduceScheduler) {
+   // Compute new capacities for maps and reduces
+    mapSlots.updateCapacities(capacityPercent, maxCapacityPercent, 
+        mapClusterCapacity);
+    reduceSlots.updateCapacities(capacityPercent, maxCapacityPercent, 
+        reduceClusterCapacity);
+
+    // reset running/pending tasks, tasks per user
+    resetSlotsUsage(TaskType.MAP);
+    resetSlotsUsage(TaskType.REDUCE);
+    
+    Collection<JobInProgress> jobs = getRunningJobs(); // Safe to iterate since
+                                                       // we get a copy here
+    for (JobInProgress j : jobs) {
+      if (j.getStatus().getRunState() != JobStatus.RUNNING) {
+        continue;
+      }
+
+      int numMapsRunningForThisJob = mapScheduler.getRunningTasks(j);
+      int numReducesRunningForThisJob = reduceScheduler.getRunningTasks(j);
+      int numRunningMapSlots = 
+        numMapsRunningForThisJob * mapScheduler.getSlotsPerTask(j);
+      int numRunningReduceSlots =
+        numReducesRunningForThisJob * reduceScheduler.getSlotsPerTask(j);
+      int numMapSlotsForThisJob = mapScheduler.getSlotsOccupied(j);
+      int numReduceSlotsForThisJob = reduceScheduler.getSlotsOccupied(j);
+      int numReservedMapSlotsForThisJob = 
+        (mapScheduler.getNumReservedTaskTrackers(j) * 
+         mapScheduler.getSlotsPerTask(j)); 
+      int numReservedReduceSlotsForThisJob = 
+        (reduceScheduler.getNumReservedTaskTrackers(j) * 
+         reduceScheduler.getSlotsPerTask(j)); 
+      
+      j.setSchedulingInfo(
+          CapacityTaskScheduler.getJobQueueSchedInfo(numMapsRunningForThisJob, 
+              numRunningMapSlots,
+              numReservedMapSlotsForThisJob,
+              numReducesRunningForThisJob, 
+              numRunningReduceSlots,
+              numReservedReduceSlotsForThisJob));
+
+      update(TaskType.MAP, j.getProfile().getUser(), 
+          numMapsRunningForThisJob, numMapSlotsForThisJob);
+      update(TaskType.REDUCE, j.getProfile().getUser(), 
+          numReducesRunningForThisJob, numReduceSlotsForThisJob);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format(queueName + " - updateQSI: job %s: run(m)=%d, "
+            + "occupied(m)=%d, run(r)=%d, occupied(r)=%d, finished(m)=%d,"
+            + " finished(r)=%d, failed(m)=%d, failed(r)=%d, "
+            + "spec(m)=%d, spec(r)=%d, total(m)=%d, total(r)=%d", j
+            .getJobID().toString(), Integer
+            .valueOf(numMapsRunningForThisJob), Integer
+            .valueOf(numMapSlotsForThisJob), Integer
+            .valueOf(numReducesRunningForThisJob), Integer
+            .valueOf(numReduceSlotsForThisJob), Integer.valueOf(j
+            .finishedMaps()), Integer.valueOf(j.finishedReduces()), Integer
+            .valueOf(j.failedMapTasks),
+            Integer.valueOf(j.failedReduceTasks), Integer
+                .valueOf(j.speculativeMapTasks), Integer
+                .valueOf(j.speculativeReduceTasks), Integer
+                .valueOf(j.numMapTasks), Integer.valueOf(j.numReduceTasks)));
+      }
+    }
+  }
+  
+  boolean doesQueueSupportPriorities() {
+    return supportsPriorities;
+  }
+
+  /**
+   * return information about the queue
+   *
+   * @return a String representing the information about the 
+   */
+  @Override
+  public String toString(){
+    // We print out the queue information first, followed by info
+    // on map and reduce tasks and job info
+    StringBuilder sb = new StringBuilder();
+    sb.append("Queue configuration\n");
+    sb.append("Capacity Percentage: ");
+    sb.append(capacityPercent);
+    sb.append("%\n");
+    sb.append("User Limit: " + ulMin + "%\n");
+    sb.append("Priority Supported: " +
+        (doesQueueSupportPriorities() ? "YES":"NO") + "\n");
+    sb.append("-------------\n");
+
+    sb.append("Map tasks\n");
+    sb.append(mapSlots.toString());
+    sb.append("-------------\n");
+    sb.append("Reduce tasks\n");
+    sb.append(reduceSlots.toString());
+    sb.append("-------------\n");
+    
+    sb.append("Job info\n");
+    sb.append("Number of Waiting Jobs: " + getNumWaitingJobs() + "\n");
+    sb.append("Number of users who have submitted jobs: " + 
+        numJobsByUser.size() + "\n");
+    return sb.toString();
+  }
+  
+  /**
+   * Functionality to deal with job initialization
+   */
+
+  
+  // per-user information
+  static class UserInfo {
+    int runningJobs;
+    int waitingJobs;
+    
+    int activeTasks;
+    
+    int getNumRunningJobs() {
+      return runningJobs;
+    }
+    
+    int getNumWaitingJobs() {
+      return waitingJobs;
+    }
+    
+    int getNumActiveTasks() {
+      return activeTasks;
+    }
+    
+    public void jobAdded(JobInProgress job) {
+      ++waitingJobs;
+    }
+    
+    public void jobInitialized(JobInProgress job) {
+      --waitingJobs;
+      
+      ++runningJobs;
+      activeTasks += job.desiredTasks();
+    }
+    
+    public void jobCompleted(JobInProgress job) {
+      --runningJobs;
+      activeTasks -= job.desiredTasks();
+    }
+    
+    boolean isInactive() {
+      return activeTasks == 0 && runningJobs == 0  && waitingJobs == 0;
+    }
+  }
+
+  synchronized Collection<JobInProgress> getWaitingJobs() {
+    return Collections.unmodifiableCollection(
+        new LinkedList<JobInProgress>(waitingJobs.values()));
+  }
+  
+  synchronized Collection<JobInProgress> getRunningJobs() {
+    return Collections.unmodifiableCollection(
+        new LinkedList<JobInProgress>(runningJobs.values())); 
+  }
+  
+  synchronized int getNumActiveTasks() {
+    return activeTasks;
+  }
+  
+  synchronized int getNumRunningJobs() {
+    return runningJobs.size();
+  }
+  
+  synchronized int getNumRunningJobsByUser(String user) {
+    UserInfo userInfo = users.get(user);
+    return (userInfo == null) ? 0 : userInfo.getNumRunningJobs();
+  }
+
+  synchronized int getNumActiveTasksByUser(String user) {
+    UserInfo userInfo = users.get(user);
+    return (userInfo == null) ? 0 : userInfo.getNumActiveTasks();
+  }
+
+  synchronized int getNumWaitingJobsByUser(String user) {
+    UserInfo userInfo = users.get(user);
+    return (userInfo == null) ? 0 : userInfo.getNumWaitingJobs();
+  }
+
+  synchronized void addRunningJob(JobInProgress job) {
+    JobSchedulingInfo jobSchedInfo = new JobSchedulingInfo(job);
+
+    if (runningJobs.containsKey(jobSchedInfo)) {
+      LOG.info("job " + job.getJobID() + " already running in queue'" +
+          queueName + "'!");
+      return;
+    }
+
+    runningJobs.put(jobSchedInfo,job);
+
+    // Update queue stats
+    activeTasks += job.desiredTasks();
+    
+    // Update user stats
+    String user = job.getProfile().getUser();
+    UserInfo userInfo = users.get(user);
+    userInfo.jobInitialized(job);
+  }
+  
+  synchronized JobInProgress removeRunningJob(JobSchedulingInfo jobInfo) {
+    JobInProgress job = runningJobs.remove(jobInfo); 
+
+    // We have to be careful, we might be trying to remove a job  
+    // which might not have been initialized
+    if (job != null) {
+      // Update user stats
+      String user = job.getProfile().getUser();
+      synchronized (users) {
+        UserInfo userInfo = users.get(user);
+
+        synchronized (userInfo) {
+          userInfo.jobCompleted(job);
+
+          if (userInfo.isInactive()) {
+            users.remove(userInfo);
+          }
+        }
+      }
+
+      // Update queue stats
+      activeTasks -= job.desiredTasks();
+    }
+
+    return job;
+  }
+  
+  synchronized void addWaitingJob(JobInProgress job) throws IOException {
+    JobSchedulingInfo jobSchedInfo = new JobSchedulingInfo(job);
+    if (waitingJobs.containsKey(jobSchedInfo)) {
+      LOG.info("job " + job.getJobID() + " already waiting in queue '" + 
+          queueName + "'!");
+      return;
+    }
+    
+    String user = job.getProfile().getUser();
+
+    // Check acceptance limits
+    checkJobSubmissionLimits(job, user);
+    
+    waitingJobs.put(new JobSchedulingInfo(job), job);
+    
+    // Update user stats
+    UserInfo userInfo = users.get(user);
+    if (userInfo == null) {
+      userInfo = new UserInfo();
+      users.put(user, userInfo);
+    }
+    userInfo.jobAdded(job);
+  }
+  
+  synchronized JobInProgress removeWaitingJob(JobSchedulingInfo schedInfo) {
+    return waitingJobs.remove(schedInfo);
+  }
+
+  synchronized int getNumActiveUsers() {
+    return users.size();
+  }
+  
+  synchronized int getNumWaitingJobs() {
+    return waitingJobs.size(); 
+  } 
+  
+  Comparator<JobSchedulingInfo> getComparator() {
+    return comparator;
+  }
+  
+  /**
+   * Functions to deal with queue-limits.
+   */
+  
+  /**
+   * Check if the queue can be assigned <code>numSlots</code> 
+   * of the given <code>taskType</code> so that the queue doesn't exceed its
+   * configured maximum-capacity.
+   * 
+   * @param taskType
+   * @param numSlots
+   * @return <code>true</code> if slots can be assigned
+   */
+  boolean assignSlotsToQueue(TaskType taskType, int numSlots) {
+    // Check if the queue is running over it's maximum-capacity
+    if (getMaxCapacity(taskType) > 0) {  // Check if max capacity is enabled
+        if ((getNumSlotsOccupied(taskType) + numSlots) > 
+             getMaxCapacity(taskType)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                "Queue " + queueName + " " + "has reached its  max " + 
+                taskType + " capacity");
+            LOG.debug("Current running tasks " + getCapacity(taskType));
+          }
+          return false;
+        }
+      }
+    
+    return true;
+  }
+  /**
+   * Check if the given <code>job</code> and <code>user</code> and 
+   * queue can be assigned the requested number of slots of 
+   * the given <code>taskType</code> for the .
+   * 
+   * This checks to ensure that queue and user are under appropriate limits.
+   * 
+   * @param taskType
+   * @param job
+   * @param user
+   * @return <code>true</code> if the given job/user/queue can be assigned 
+   * the requested number of slots, <code>false</code> otherwise
+   */
+  boolean assignSlotsToJob(TaskType taskType, JobInProgress job, String user) {
+    // Check to ensure we will not go over the queue's max-capacity
+    if (!assignSlotsToQueue(taskType, job.getNumSlotsPerTask(taskType))) {
+      return false;
+    }
+    
+    // what is our current capacity? It is equal to the queue-capacity if
+    // we're running below capacity. If we're running over capacity, then its
+    // #running plus slotPerTask of the job (which is the number of extra
+    // slots we're getting).
+    int currentCapacity;
+    int queueCapacity = getCapacity(taskType);
+    if (getNumSlotsOccupied(taskType) < queueCapacity) {
+      currentCapacity = queueCapacity;
+    }
+    else {
+      currentCapacity = 
+        getNumSlotsOccupied(taskType) + job.getNumSlotsPerTask(taskType);
+    }
+    
+    // Never allow a single user to take more than the 
+    // queue's configured capacity * user-limit-factor.
+    // Also, the queue's configured capacity should be higher than 
+    // queue-hard-limit * ulMin
+    int limit = 
+      Math.min(
+          Math.max(divideAndCeil(currentCapacity, numJobsByUser.size()), 
+                   divideAndCeil(ulMin*currentCapacity, 100)),
+          (int)(queueCapacity * ulMinFactor)
+          );
+    if (getNumSlotsOccupiedByUser(user, taskType) >= limit) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("User " + user + " is over limit, num slots occupied=" + 
+            getNumSlotsOccupiedByUser(user, taskType) + 
+            ", limit=" + limit);
+      }
+      return false;
+    }
+
+    return true;
+  }
+  
+  /**
+   * Ceil of result of dividing two integers.
+   * 
+   * This is *not* a utility method. 
+   * Neither <code>a</code> or <code>b</code> should be negative.
+   *  
+   * @param a
+   * @param b
+   * @return ceil of the result of a/b
+   */
+  private static int divideAndCeil(int a, int b) {
+    if (b == 0) {
+      LOG.info("divideAndCeil called with a=" + a + " b=" + b);
+      return 0;
+    }
+    return (a + (b - 1)) / b;
+  }
+
+  /**
+   * Check if the given <code>job</code> can be accepted to the 
+   * queue on behalf of the <code>user</code>.
+   * @param job 
+   * @param user
+   * @return <code>true</code> if the job can be accepted, 
+   *         <code>false</code> otherwise
+   */
+  synchronized void checkJobSubmissionLimits(JobInProgress job, String user) 
+  throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("checkJobSubmissionLimits - " +
+          "qWaitJobs=" + getNumWaitingJobs() + " " +
+          "qRunJobs=" + getNumRunningJobs() + " " +
+          "maxJobsToAccept=" + maxJobsToAccept +
+          "user=" + user + " " +
+          "uWaitJobs=" +  getNumWaitingJobsByUser(user) + " " +
+          "uRunJobs=" + getNumRunningJobsByUser(user)  + " " +
+          "maxJobsPerUserToAccept=" + maxJobsPerUserToAccept + " " +
+          "");
+    }
+    
+    // Across all jobs in queue
+    if ((getNumWaitingJobs() + getNumRunningJobs()) >= maxJobsToAccept) {
+      throw new IOException(
+          "Job '" + job.getJobID() + "' from user '" + user  + 
+          "' rejected since queue '" + queueName + 
+          "' already has " + getNumWaitingJobs() + " waiting jobs and " + 
+          getNumRunningJobs() + " running jobs and exceeds limit of " +
+          maxJobsToAccept + " jobs to accept");
+    }
+    
+    // Across all jobs of the user
+    if ((getNumWaitingJobsByUser(user) + getNumRunningJobsByUser(user)) >= 
+        maxJobsPerUserToAccept) {
+      throw new IOException(
+          "Job '" + job.getJobID() + "' rejected since user '" + user +  
+          "' already has " + getNumWaitingJobsByUser(user) + " waiting jobs" +
+          " and " + getNumRunningJobsByUser(user) + " running jobs," +
+          " it exceeds limit of " + maxJobsToAccept + " jobs to accept" +
+          " in queue '" + queueName + "'");
+    }
+  }
+  
+  /**
+   * Check if the <code>job</code> can be initialized in the queue.
+   * 
+   * @param job
+   * @param currentlyInitializedJobs
+   * @param currentlyInitializedTasks
+   * @return <code>true</code> if the job can be initialized, 
+   *         <code>false</code> otherwise
+   */
+  synchronized boolean initializeJobForQueue(JobInProgress job,
+      int currentlyInitializedJobs, int currentlyInitializedTasks) {
+    
+    // Check if queue has sufficient number of jobs
+    int runningJobs = getNumRunningJobs();
+    if ((runningJobs + currentlyInitializedJobs) >= maxJobsToInit) {
+      LOG.info(getQueueName() + " already has " + runningJobs + 
+          " running jobs and " + currentlyInitializedJobs + " jobs about to be" +
+          " initialized, cannot initialize " + job.getJobID() + 
+          " since it will exceeed limit of " + 
+          maxJobsToInit + " initialized jobs for this queue");
+      return false;
+    }
+    
+    // Check if queue has too many active tasks
+    if ((activeTasks + currentlyInitializedTasks + job.desiredTasks()) >= 
+         maxActiveTasks) {
+      LOG.info("Queue '" + getQueueName() + "' has " + activeTasks + 
+          " active tasks and " + currentlyInitializedTasks + " tasks about to" +
+          " be initialized, cannot initialize job '" + job.getJobID() + 
+          "' for user '" + job.getProfile().getUser() + "' with " +
+          job.desiredTasks() + " tasks since it will exceed limit of " + 
+          maxActiveTasks + " active tasks for this queue");
+      return false;
+    }
+    
+    return true;
+  }
+  
+  /**
+   * Check if the <code>job</code> can be initialized in the queue
+   * on behalf of the <code>user</code>.
+   * 
+   * @param job
+   * @param user
+   * @param currentlyInitializedJobs
+   * @param currentlyInitializedTasks
+   * @return <code>true</code> if the job can be initialized, 
+   *         <code>false</code> otherwise
+   */
+  synchronized boolean initializeJobForUser(JobInProgress job, 
+      String user, int currentlyInitializedJobs, 
+      int currentlyInitializedTasks) {
+    
+    // Check if the user has too many jobs
+    int userRunningJobs = getNumRunningJobsByUser(user);
+    if ((userRunningJobs + currentlyInitializedJobs) >=
+        maxJobsPerUserToInit) {
+      LOG.info(getQueueName() + " already has " + userRunningJobs + 
+          " running jobs and " + currentlyInitializedJobs + " jobs about to be" +
+          " initialized, for user " + user + 
+          "; cannot initialize " + job.getJobID() + 
+          " since it will exceeed limit of " + 
+          maxJobsPerUserToInit + 
+          " initialized jobs per user for this queue");
+      return false;
+    }
+    
+    // Check if the user has too many active tasks
+    int userActiveTasks = getNumActiveTasksByUser(user);
+    if ((userActiveTasks + currentlyInitializedTasks) >= 
+        maxActiveTasksPerUser) {
+      LOG.info(getQueueName() + " has " + userActiveTasks + 
+          " active tasks and " + currentlyInitializedTasks + " tasks about to" +
+          " be initialized for user " + user + 
+          ", cannot initialize " + job.getJobID() + " with " +
+          job.desiredTasks() + " tasks since it will exceed limit of " + 
+          maxActiveTasksPerUser + 
+          " active tasks per user for this queue");
+      return false;
+    }
+    
+    return true;
+  }
+
+}
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerServlet.java?rev=1077573&r1=1077572&r2=1077573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerServlet.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerServlet.java Fri Mar  4 04:30:34 2011
@@ -30,8 +30,7 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.hadoop.mapred.CapacityTaskScheduler.QueueSchedulingInfo;
-import org.apache.hadoop.mapred.CapacityTaskScheduler.TaskSchedulingInfo;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapred.JobHistory.JobInfo;
 import org.apache.hadoop.util.StringUtils;
 
@@ -121,34 +120,33 @@ public class CapacitySchedulerServlet ex
                 "<th>Reduce Task Used Capacity</th>" +
                 "<th>Running Reduces </tr>\n");
       JobQueuesManager queuesManager = scheduler.getJobQueuesManager();
-      for (QueueSchedulingInfo qsi : scheduler.getQueueInfoMap().values()) {
-        String queueName = qsi.getQueueName();
-        TaskSchedulingInfo maptsi = scheduler.getMapScheduler().getTSI(qsi);
-        TaskSchedulingInfo redtsi = scheduler.getReduceScheduler().getTSI(qsi);
+      for (CapacitySchedulerQueue queue : scheduler.getQueueInfoMap().values()) {
+        String queueName = queue.getQueueName();
         out.print("<tr>\n");
         out.printf(
             "<td><a href=\"jobqueue_details.jsp?queueName=%s\">%s</a></td>\n",
             queueName, queueName);
-        out.printf("<td>%s</td>\n", queuesManager.getNumRunningJobs(queueName));
-        out.printf("<td>%s</td>\n", queuesManager.getNumWaitingJobs(queueName));
-        out.printf("<td>%.1f%%</td>\n", qsi.getCapacityPercent());
-        int mapCapacity = maptsi.getCapacity();
-        int mapSlotsOccupied = maptsi.getNumSlotsOccupied();
-        int reduceSlotsOccupied = redtsi.getNumSlotsOccupied();
+        out.printf("<td>%s</td>\n", queue.getNumRunningJobs());
+        out.printf("<td>%s</td>\n", queue.getNumWaitingJobs());
+        out.printf("<td>%.1f%%</td>\n", queue.getCapacityPercent());
+        int mapCapacity = queue.getCapacity(TaskType.MAP);
+        int mapSlotsOccupied = queue.getNumSlotsOccupied(TaskType.MAP);
+        int reduceSlotsOccupied = queue.getNumSlotsOccupied(TaskType.REDUCE);
         float occupiedSlotsAsPercent = 
             mapCapacity != 0 ? ((float) mapSlotsOccupied * 100 / mapCapacity)
             : 0;
         out.printf("<td>%s</td>\n", mapCapacity);
         out.printf("<td>%s (%.1f%% of Capacity)</td>\n", mapSlotsOccupied,
             occupiedSlotsAsPercent);
-        out.printf("<td>%s</td>\n", maptsi.getNumRunningTasks());
-        int reduceCapacity = redtsi.getCapacity();
-        float redOccupiedSlotsAsPercent = reduceCapacity != 0 ? ((float) reduceSlotsOccupied * 100 / mapCapacity)
-            : 0;
+        out.printf("<td>%s</td>\n", queue.getNumRunningTasks(TaskType.MAP));
+        int reduceCapacity = queue.getCapacity(TaskType.REDUCE);
+        float redOccupiedSlotsAsPercent = 
+          (reduceCapacity != 0 ? ((float)reduceSlotsOccupied*100 / mapCapacity)
+            : 0);
         out.printf("<td>%s</td>\n", reduceCapacity);
         out.printf("<td>%s (%.1f%% of Capacity)</td>\n", reduceSlotsOccupied,
             redOccupiedSlotsAsPercent);
-        out.printf("<td>%s</td>\n", redtsi.getNumRunningTasks());
+        out.printf("<td>%s</td>\n", queue.getNumRunningTasks(TaskType.REDUCE));
       }
       out.print("</table>\n");
     }



Mime
View raw message