hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077031 - in /hadoop/common/branches/branch-0.20-security-patches: conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/docs/src/documentation/content...
Date Fri, 04 Mar 2011 03:31:51 GMT
Author: omalley
Date: Fri Mar  4 03:31:50 2011
New Revision: 1077031

URL: http://svn.apache.org/viewvc?rev=1077031&view=rev
Log:
commit ebf9b7a20a18ec8b7b6c86a26becdb7020726cee
Author: Hemanth Yamijala <yhemanth@yahoo-inc.com>
Date:   Wed Oct 21 23:06:57 2009 +0530

    MAPREDUCE:1105 from https://issues.apache.org/jira/secure/attachment/12422823/MAPREDUCE-1105-yahoo-version20-5.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1105. Remove max limit configuration in capacity scheduler in
    +    favor of max capacity percentage thus allowing the limit to go over
    +    queue capacity. (Rahul Kumar Singh via yhemanth)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
    hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml

Modified: hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template?rev=1077031&r1=1077030&r2=1077031&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template (original)
+++ hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template Fri Mar  4 03:31:50 2011
@@ -16,6 +16,25 @@
   </property>
   
   <property>
+    <name>mapred.capacity-scheduler.queue.default.maximum-capacity</name>
+    <value>-1</value>
+    <description>
+	maximum-capacity defines a limit beyond which a queue cannot use the capacity of the cluster.
+	This provides a means to limit how much excess capacity a queue can use. By default, there is no limit.
+	The maximum-capacity of a queue can only be greater than or equal to its minimum capacity.
+        Default value of -1 implies a queue can use complete capacity of the cluster.
+
+        This property could be to curtail certain jobs which are long running in nature from occupying more than a 
+        certain percentage of the cluster, which in the absence of pre-emption, could lead to capacity guarantees of 
+        other queues being affected.
+        
+        One important thing to note is that maximum-capacity is a percentage , so based on the cluster's capacity
+        the max capacity would change. So if large no of nodes or racks get added to the cluster , max Capacity in 
+        absolute terms would increase accordingly.
+    </description>    
+  </property>
+  
+  <property>
     <name>mapred.capacity-scheduler.queue.default.supports-priority</name>
     <value>false</value>
     <description>If true, priorities of jobs will be taken into 
@@ -46,44 +65,6 @@
     </description>
   </property>
 
-<property>
-  <name>mapred.capacity-scheduler.queue.default.max.map.slots</name>
-  <value>-1</value>
-  <description>
-    This value is the maximum map slots that can be used in a
-    queue at any point of time. So for example assuming above config value
-    is 100 , not more than 100 tasks would be in the queue at any point of
-    time, assuming each task takes one slot.
-
-    Default value of -1 would disable this capping feature
-
-    Typically the queue capacity should be equal to this limit.
-    If queue capacity is more than this limit, excess capacity will be
-    used by the other queues. If queue capacity is less than the above
-    limit , then the limit would be the queue capacity - as in the current
-    implementation
-  </description>
-</property>
-
-<property>
-  <name>mapred.capacity-scheduler.queue.default.max.reduce.slots</name>
-  <value>-1</value>
-  <description>
-    This value is the maximum reduce slots that can be used in a
-    queue at any point of time. So for example assuming above config value
-      is 100 , not more than 100 reduce tasks would be in the queue at any point
-      of time, assuming each task takes one slot.
-
-    Default value of -1 would disable this capping feature
-
-    Typically the queue capacity should be equal to this limit.
-    If queue capacity is more than this limit, excess capacity will be
-    used by the other queues. If queue capacity is less than the above
-    limit , then the limit would be the queue capacity - as in the current
-    implementation
-  </description>
-</property>
-  
   <!-- The default configuration settings for the capacity task scheduler -->
   <!-- The default values would be applied to all the queues which don't have -->
   <!-- the appropriate property for the particular queue -->

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=1077031&r1=1077030&r2=1077031&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Fri Mar  4 03:31:50 2011
@@ -75,17 +75,14 @@ class CapacitySchedulerConf {
   static final String UPPER_LIMIT_ON_TASK_PMEM_PROPERTY =
     "mapred.capacity-scheduler.task.limit.maxpmem";
 
-  /**
-   *  Configuration that provides the maximum cap for the map task in a queue
-   *  at any given point of time.
-   */
-  static final String MAX_MAP_CAP_PROPERTY = "max.map.slots";
+
+  private static final String CAPACITY_PROPERTY = "capacity";
 
   /**
-   *  Configuration that provides the maximum cap for the reduce task in a queue
-   *  at any given point of time.
+    * A maximum capacity defines a limit beyond which a queue
+    * cannot expand .
    */
-  static final String MAX_REDUCE_CAP_PROPERTY = "max.reduce.slots";
+   static final String MAX_CAPACITY_PROPERTY ="maximum-capacity";
 
   /**
    * The constant which defines the default initialization thread
@@ -104,9 +101,9 @@ class CapacitySchedulerConf {
   private int defaultMaxJobsPerUsersToInitialize;
   
   /**
-   * Create a new ResourceManagerConf.
+   * Create a new Capacity scheduler conf.
    * This method reads from the default configuration file mentioned in
-   * {@link RM_CONF_FILE}, that must be present in the classpath of the
+   * {@link SCHEDULER_CONF_FILE}, that must be present in the classpath of the
    * application.
    */
   public CapacitySchedulerConf() {
@@ -116,7 +113,7 @@ class CapacitySchedulerConf {
   }
 
   /**
-   * Create a new ResourceManagerConf reading the specified configuration
+   * Create a new Cacpacity scheduler conf reading the specified configuration
    * file.
    * 
    * @param configFile {@link Path} to the configuration file containing
@@ -163,16 +160,15 @@ class CapacitySchedulerConf {
     //In case of both capacity and default capacity not configured.
     //Last check is if the configuration is specified and is marked as
     //negative we throw exception
-    String raw = rmConf.getRaw(toFullPropertyName(queue, 
-        "capacity"));
+    String raw = rmConf.getRaw(toFullPropertyName(queue, CAPACITY_PROPERTY));
     if(raw == null) {
       return -1;
     }
-    float result = rmConf.getFloat(toFullPropertyName(queue, 
-                                   "capacity"), 
-                                   -1);
+    float result = rmConf.getFloat(
+      toFullPropertyName(queue, CAPACITY_PROPERTY), -1);
     if (result < 0.0 || result > 100.0) {
-      throw new IllegalArgumentException("Illegal capacity for queue " + queue +
+      throw new IllegalArgumentException(
+        "Illegal capacity for queue " + queue +
                                          " of " + result);
     }
     return result;
@@ -185,7 +181,53 @@ class CapacitySchedulerConf {
    * @param capacity percent of the cluster for the queue.
    */
   public void setCapacity(String queue,float capacity) {
-    rmConf.setFloat(toFullPropertyName(queue, "capacity"),capacity);
+    rmConf.setFloat(toFullPropertyName(queue, CAPACITY_PROPERTY),capacity);
+  }
+
+  /**
+   * Return the maximum percentage of the cluster capacity that can be used by
+   * the given queue.
+   * This percentage defines a limit beyond which a
+   * queue cannot use the capacity of cluster.
+   * This provides a means to limit how much excess capacity a
+   * queue can use. By default, there is no limit.
+   *
+   * The maximum-capacity of a queue can only be
+   * greater than or equal to its minimum capacity.
+   *
+   * @param queue name of the queue.
+   * @return maximum-capacity for the given queue
+   */
+  public float getMaxCapacity(String queue) {
+    float result = rmConf.getFloat(
+      toFullPropertyName(queue, MAX_CAPACITY_PROPERTY), -1);
+
+    //if result is 0 or less than 0 set it to -1
+    result = (result <= 0) ? -1 : result;
+
+    if (result > 100.0) {
+      throw new IllegalArgumentException(
+        "Illegal " + MAX_CAPACITY_PROPERTY +
+          " for queue " + queue + " of " + result);
+    }
+
+    if ((result != -1) && (result < getCapacity(queue))) {
+      throw new IllegalArgumentException(
+        MAX_CAPACITY_PROPERTY + " " + result +
+          " for a queue should be greater than or equal to capacity ");
+    }
+    return result;
+  }
+
+    /**
+   * Sets the maxCapacity of the given queue.
+   *
+   * @param queue name of the queue
+   * @param maxCapacity percent of the cluster for the queue.
+   */
+  public void setMaxCapacity(String queue,float maxCapacity) {
+      rmConf.setFloat(
+        toFullPropertyName(queue, MAX_CAPACITY_PROPERTY), maxCapacity);
   }
   
   /**
@@ -369,40 +411,4 @@ class CapacitySchedulerConf {
     rmConf.setInt(
         "mapred.capacity-scheduler.init-worker-threads", poolSize);
   }
-
-  /**
-   * get the max map slots cap
-   * @param queue
-   * @return
-   */
-  public int getMaxMapCap(String queue) {
-    return rmConf.getInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),-1);
-  }
-
-  /**
-   * Used for testing
-   * @param queue
-   * @param val
-   */
-  public void setMaxMapCap(String queue,int val) {
-    rmConf.setInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),val);
-  }
-
-  /**
-   * get the max reduce slots cap
-   * @param queue
-   * @return
-   */
-  public int getMaxReduceCap(String queue) {
-    return rmConf.getInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),-1);    
-  }
-
-  /**
-   * Used for testing
-   * @param queue
-   * @param val
-   */
-  public void setMaxReduceCap(String queue,int val) {
-    rmConf.setInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),val);
-  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1077031&r1=1077030&r2=1077031&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Mar  4 03:31:50 2011
@@ -31,8 +31,6 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
-import org.apache.hadoop.mapred.TaskTrackerStatus;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
@@ -77,10 +75,7 @@ class CapacityTaskScheduler extends Task
    **********************************************************************/
 
   private static class TaskSchedulingInfo {
-    
-    private static final String LIMIT_NORMALIZED_CAPACITY_STRING
-      = "(Capacity is restricted to max limit of %d slots.\n" +
-        "Remaining %d slots will be used by other queues.)\n";
+
     /** 
      * the actual capacity, which depends on how many slots are available
      * in the cluster at any given time. 
@@ -91,14 +86,9 @@ class CapacityTaskScheduler extends Task
     // number of slots occupied by running tasks
     int numSlotsOccupied = 0;
 
-    /**
-     * max task limit
-     * This value is the maximum slots that can be used in a
-     * queue at any point of time. So for example assuming above config value
-     * is 100 , not more than 100 tasks would be in the queue at any point of
-     * time, assuming each task takes one slot.
-     */
-    private int maxTaskLimit = -1;
+    //the actual maximum capacity which depends on how many slots are available
+    //in cluster at any given time.
+    private int maxCapacity = -1;
 
     /**
      * for each user, we need to keep track of number of slots occupied by
@@ -119,26 +109,19 @@ class CapacityTaskScheduler extends Task
     }
 
 
-    int getMaxTaskLimit() {
-      return maxTaskLimit;
-    }
-
-    void setMaxTaskLimit(int maxTaskCap) {
-      this.maxTaskLimit = maxTaskCap;
-    }
-
     /**
-     * This method checks for maxTaskLimit and sends minimum of maxTaskLimit and
+     * Returns the actual capacity.
      * capacity.
+     *
      * @return
      */
     int getCapacity() {
-      return ((maxTaskLimit >= 0) && (maxTaskLimit < capacity)) ? maxTaskLimit :
-        capacity;
+      return capacity;
     }
 
     /**
      * Mutator method for capacity
+     *
      * @param capacity
      */
     void setCapacity(int capacity) {
@@ -157,13 +140,9 @@ class CapacityTaskScheduler extends Task
       StringBuffer sb = new StringBuffer();
       
       sb.append("Capacity: " + capacity + " slots\n");
-      //If maxTaskLimit is less than the capacity
-      if (maxTaskLimit >= 0 && maxTaskLimit < capacity) {
-        sb.append(String.format(LIMIT_NORMALIZED_CAPACITY_STRING,   
-                        maxTaskLimit, (capacity-maxTaskLimit)));
-      }
-      if (maxTaskLimit >= 0) {
-        sb.append(String.format("Maximum Slots Limit: %d\n", maxTaskLimit));
+      
+      if(getMaxCapacity() >= 0) {
+        sb.append("Maximum capacity: " + getMaxCapacity() +" slots\n");
       }
       sb.append(String.format("Used capacity: %d (%.1f%% of Capacity)\n",
           Integer.valueOf(numSlotsOccupied), Float
@@ -189,21 +168,42 @@ class CapacityTaskScheduler extends Task
       }
       return sb.toString();
     }
+
+    int getMaxCapacity() {
+      return maxCapacity;
+    }
+
+    void setMaxCapacity(int maxCapacity) {
+      this.maxCapacity = maxCapacity;
+    }
   }
   
   private static class QueueSchedulingInfo {
     String queueName;
 
-    /** capacity(%) is set in the config */ 
+    /**
+     * capacity(%) is set in the config
+     */
     float capacityPercent = 0;
     
+    
+  /**
+   * maxCapacityPercent(%) is set in config as
+   * mapred.capacity-scheduler.queue.<queue-name>.maximum-capacity
+   * maximum-capacity percent defines a limit beyond which a queue
+   * cannot expand. Remember this limit is dynamic and changes w.r.t
+   * cluster size.
+   */
+    float maxCapacityPercent = -1;
     /** 
      * to handle user limits, we need to know how many users have jobs in 
      * the queue.
      */  
     Map<String, Integer> numJobsByUser = new HashMap<String, Integer>();
       
-    /** min value of user limit (same for all users) */
+    /**
+     * min value of user limit (same for all users)
+     */
     int ulMin;
     
     /**
@@ -218,21 +218,22 @@ class CapacityTaskScheduler extends Task
     TaskSchedulingInfo mapTSI;
     TaskSchedulingInfo reduceTSI;
     
-    public QueueSchedulingInfo(String queueName, float capacityPercent,
-                               int ulMin, JobQueuesManager jobQueuesManager,
-                               int mapCap, int reduceCap) {
+    public QueueSchedulingInfo(
+      String queueName, float capacityPercent,
+      float maxCapacityPercent, int ulMin, JobQueuesManager jobQueuesManager
+    ) {
       this.queueName = new String(queueName);
       this.capacityPercent = capacityPercent;
+      this.maxCapacityPercent = maxCapacityPercent;
       this.ulMin = ulMin;
       this.jobQueuesManager = jobQueuesManager;
       this.mapTSI = new TaskSchedulingInfo();
       this.reduceTSI = new TaskSchedulingInfo();
-      this.mapTSI.setMaxTaskLimit(mapCap);
-      this.reduceTSI.setMaxTaskLimit(reduceCap);
     }
     
     /**
      * return information about the queue
+     *
      * @return a String representing the information about the queue.
      */
     @Override
@@ -508,6 +509,10 @@ class CapacityTaskScheduler extends Task
         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
           continue;
         }
+        //Check if queue is over maximum-capacity
+        if(this.areTasksInQueueOverMaxCapacity(qsi,j.getNumSlotsPerTask(type))) {
+          continue;
+        }
         // check if the job's user is over limit
         if (isUserOverLimit(j, qsi)) {
           continue;
@@ -572,6 +577,12 @@ class CapacityTaskScheduler extends Task
         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
           continue;
         }
+        //Check if queue is over maximum-capacity
+        if (this.areTasksInQueueOverMaxCapacity(
+          qsi, j.getNumSlotsPerTask(type))) {
+          continue;
+        }
+        
         if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
             taskTrackerStatus)) {
           // We found a suitable job. Get task from it.
@@ -657,8 +668,10 @@ class CapacityTaskScheduler extends Task
         if (0 == getTSI(qsi).getCapacity()) {
           continue;
         }
-        
-        if(this.areTasksInQueueOverLimit(qsi)) {
+
+        //This call is for optimization if we are already over the
+        //maximum-capacity we avoid traversing the queues.
+        if(this.areTasksInQueueOverMaxCapacity(qsi,1)) {
           continue;
         }
         TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
@@ -685,22 +698,32 @@ class CapacityTaskScheduler extends Task
 
 
     /**
-     * Check if the max task limit is set for this queue
-     * if set , ignore this qsi if current num of occupied
-     * slots  of a TYPE in the queue is >= getMaxTaskCap().
+     * Check if maximum-capacity is set for this queue.
+     * If set and greater than 0 ,
+     * check if numofslotsoccupied+numSlotsPerTask is greater than
+     * maximum-capacity , if yes , implies this queue is over limit.
+     *
+     * Incase noOfSlotsOccupied is less than maximum-capacity ,but ,
+     * numOfSlotsOccupied + noSlotsPerTask is more than maximum-capacity we
+     * still dont assign the task . This may lead to under utilization of very
+     * small set of slots. But this is ok , as we strictly respect the
+     * maximum-capacity limit.
+     * 
      * @param qsi
-     * @return
+     * @return true if queue is over limit.
      */
 
-    private boolean areTasksInQueueOverLimit(QueueSchedulingInfo qsi) {
+    private boolean areTasksInQueueOverMaxCapacity(
+      QueueSchedulingInfo qsi, int numSlotsPerTask) {
       TaskSchedulingInfo tsi = getTSI(qsi);
-      if (tsi.getMaxTaskLimit() >= 0) {
-        if (tsi.numSlotsOccupied >= tsi.getCapacity()) {
+      if (tsi.getMaxCapacity() >= 0) {
+        if ((tsi.numSlotsOccupied + numSlotsPerTask) > tsi.getMaxCapacity()) {
           if (LOG.isDebugEnabled()) {
             LOG.debug(
-              "Queue " + qsi.queueName + " has reached its  max " + type +
-                " limit ");
+              "Queue " + qsi.queueName + " " + "has reached its  max " + type +
+                "Capacity");
             LOG.debug("Current running tasks " + tsi.getCapacity());
+
           }
           return true;
         }
@@ -719,12 +742,12 @@ class CapacityTaskScheduler extends Task
           s.append(
             String.format(
               " Queue '%s'(%s): runningTasks=%d, "
-                + "occupiedSlots=%d, capacity=%d, runJobs=%d  maxTaskLimit=%d ",
+                + "occupiedSlots=%d, capacity=%d, runJobs=%d  maxCapacity=%d ",
               qsi.queueName,
               this.type, Integer.valueOf(tsi.numRunningTasks), Integer
                 .valueOf(tsi.numSlotsOccupied), Integer
                 .valueOf(tsi.getCapacity()), Integer.valueOf(runJobs.size()),
-              Integer.valueOf(tsi.getMaxTaskLimit())));
+              Integer.valueOf(tsi.getMaxCapacity())));
         }
         LOG.debug(s);
       }
@@ -792,7 +815,7 @@ class CapacityTaskScheduler extends Task
     @Override
     int getSlotsPerTask(JobInProgress job) {
       return 
-        job.getJobConf().computeNumSlotsPerMap(scheduler.getMemSizeForMapSlot());    
+        job.getJobConf().computeNumSlotsPerMap(scheduler.getMemSizeForMapSlot());
     }
 
     @Override
@@ -1051,19 +1074,20 @@ class CapacityTaskScheduler extends Task
     }
 
     Set<String> queuesWithoutConfiguredCapacity = new HashSet<String>();
-    float totalCapacity = 0.0f;
+    float totalCapacityPercent = 0.0f;
     for (String queueName: queues) {
-      float capacity = schedConf.getCapacity(queueName);
-      if(capacity == -1.0) {
+      float capacityPercent = schedConf.getCapacity(queueName);
+      if (capacityPercent == -1.0) {
         queuesWithoutConfiguredCapacity.add(queueName);
       }else {
-        totalCapacity += capacity;
+        totalCapacityPercent += capacityPercent;
       }
+
+      float maxCapacityPercent = schedConf.getMaxCapacity(queueName);
       int ulMin = schedConf.getMinimumUserLimitPercent(queueName);
       // create our QSI and add to our hashmap
       QueueSchedulingInfo qsi = new QueueSchedulingInfo(
-        queueName, capacity, ulMin, jobQueuesManager, schedConf.getMaxMapCap(
-          queueName), schedConf.getMaxReduceCap(queueName));
+        queueName, capacityPercent, maxCapacityPercent ,ulMin, jobQueuesManager);
       queueInfoMap.put(queueName, qsi);
 
       // create the queues of job objects
@@ -1075,18 +1099,27 @@ class CapacityTaskScheduler extends Task
       queueManager.setSchedulerInfo(queueName, schedulingInfo);
       
     }
-    float remainingQuantityToAllocate = 100 - totalCapacity;
+    float remainingQuantityToAllocate = 100 - totalCapacityPercent;
     float quantityToAllocate = 
       remainingQuantityToAllocate/queuesWithoutConfiguredCapacity.size();
     for(String queue: queuesWithoutConfiguredCapacity) {
       QueueSchedulingInfo qsi = queueInfoMap.get(queue); 
       qsi.capacityPercent = quantityToAllocate;
+      if(qsi.maxCapacityPercent >= 0) {
+        if(qsi.capacityPercent > qsi.maxCapacityPercent) {
+          throw new IllegalStateException(
+            " Allocated capacity of " + qsi.capacityPercent +
+              " to unconfigured queue " + qsi.queueName +
+              " is greater than maximum Capacity " + qsi.maxCapacityPercent);
+        }
+      }
       schedConf.setCapacity(queue, quantityToAllocate);
     }    
     
-    if (totalCapacity > 100.0) {
-      throw new IllegalArgumentException("Sum of queue capacities over 100% at "
-                                         + totalCapacity);
+    if (totalCapacityPercent > 100.0) {
+      throw new IllegalArgumentException(
+        "Sum of queue capacities over 100% at "
+          + totalCapacityPercent);
     }    
     
     // let our mgr objects know about the queues
@@ -1151,22 +1184,37 @@ class CapacityTaskScheduler extends Task
    * to make scheduling decisions. For example, we don't need an exact count
    * of numRunningTasks. Once we count upto the grid capacity, any
    * number beyond that will make no difference.
-   *
-   **/
-  private synchronized void updateQSIObjects(int mapClusterCapacity,
+   */
+  private synchronized void updateQSIObjects(
+    int mapClusterCapacity,
       int reduceClusterCapacity) {
     // if # of slots have changed since last time, update.
     // First, compute whether the total number of TT slots have changed
     for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
       // compute new capacities, if TT slots have changed
       if (mapClusterCapacity != prevMapClusterCapacity) {
-        qsi.mapTSI.setCapacity((int)
+        qsi.mapTSI.setCapacity(
+          (int)
           (qsi.capacityPercent*mapClusterCapacity/100));
+
+        //compute new max map capacities
+        if(qsi.maxCapacityPercent > 0) {
+          qsi.mapTSI.setMaxCapacity(
+            (int) (qsi.maxCapacityPercent * mapClusterCapacity / 100));
+        }
       }
       if (reduceClusterCapacity != prevReduceClusterCapacity) {
-        qsi.reduceTSI.setCapacity((int)
-          (qsi.capacityPercent*reduceClusterCapacity/100));
+        qsi.reduceTSI.setCapacity(
+          (int)
+            (qsi.capacityPercent * reduceClusterCapacity / 100));
+
+        //compute new max reduce capacities
+        if (qsi.maxCapacityPercent > 0) {
+          qsi.reduceTSI.setMaxCapacity(
+            (int) (qsi.maxCapacityPercent * reduceClusterCapacity / 100));
+        }
       }
+
       // reset running/pending tasks, tasks per user
       qsi.mapTSI.resetTaskVars();
       qsi.reduceTSI.resetTaskVars();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1077031&r1=1077030&r2=1077031&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar  4 03:31:50 2011
@@ -36,7 +36,6 @@ import org.apache.commons.logging.LogFac
 
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 
 import org.apache.hadoop.mapreduce.TaskType;
@@ -864,83 +863,45 @@ public class TestCapacityScheduler exten
   }
 
   /**
-   * Test the max map limit.
+   * Test the max Capacity for map and reduce
    * @throws IOException
    */
-  public void testMaxMapCap() throws IOException {
+  public void testMaxCapacities() throws IOException {
     this.setUp(4,1,1);
     taskTrackerManager.addQueues(new String[] {"default"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
-    resConf.setFakeQueues(queues);
-    resConf.setMaxMapCap("default",2);
-    resConf.setMaxReduceCap("default",-1);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.start();
-
-    //submit the Job
-    FakeJobInProgress fjob1 =
-      submitJobAndInit(JobStatus.PREP,3,1,"default","user");
-
-    List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
-    List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
-
-    //Once the 2 tasks are running the third assigment should be reduce.
-    checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
-    //This should fail.
-    List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
-    assertNull(task4);
-    //Now complete the task 1.
-        // complete the job
-    taskTrackerManager.finishTask("tt1", task1.get(0).getTaskID().toString(),
-                                  fjob1);
-    //We have completed the tt1 task which was a map task so we expect one map
-    //task to be picked up
-    checkAssignment("tt4","attempt_test_0001_m_000003_0 on tt4");
-  }
+    queues.add(new FakeQueueInfo("default", 25.0f, false, 1));
 
-  /**
-   * Test max reduce limit
-   * @throws IOException
-   */
-  public void testMaxReduceCap() throws IOException {
-    this.setUp(4, 1, 1);
-    taskTrackerManager.addQueues(new String[]{"default"});
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
     resConf.setFakeQueues(queues);
-    resConf.setMaxMapCap("default", -1);
-    resConf.setMaxReduceCap("default", 2);
+    resConf.setMaxCapacity("default", 50.0f);
     scheduler.setResourceManagerConf(resConf);
+    scheduler.setAssignMultipleTasks(true);
     scheduler.start();
 
     //submit the Job
     FakeJobInProgress fjob1 =
-      submitJobAndInit(JobStatus.PREP, 1, 3, "default", "user");
+      submitJobAndInit(JobStatus.PREP, 4, 4, "default", "user");
 
-    List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
-    List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
-    List<Task> task3 = scheduler.assignTasks(tracker("tt3"));
-
-    //This should fail. 1 map, 2 reduces , we have reached the limit.
-    List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
-    assertNull(task4);
-    //Now complete the task 1 i.e map task.
-    // complete the job
-    taskTrackerManager.finishTask(
-      "tt1", task1.get(0).getTaskID().toString(),
-      fjob1);
-
-    //This should still fail as only map task is done
-    task4 = scheduler.assignTasks(tracker("tt4"));
-    assertNull(task4);
-
-    //Complete the reduce task
-    taskTrackerManager.finishTask(
-      "tt2", task2.get(0).getTaskID().toString(), fjob1);
+    //default queue has min capacity of 1 and max capacity of 2
 
-    //One reduce is done hence assign the new reduce.
-    checkAssignment("tt4","attempt_test_0001_r_000003_0 on tt4");
+    //first call of assign task should give task from default queue.
+    //default uses 1 map and 1 reduce slots are used
+    checkMultipleAssignment(
+      "tt1", "attempt_test_0001_m_000001_0 on tt1",
+      "attempt_test_0001_r_000001_0 on tt1");
+
+    //second call of assign task
+    //default uses 2 map and 2 reduce slots
+    checkMultipleAssignment(
+      "tt2", "attempt_test_0001_m_000002_0 on tt2",
+      "attempt_test_0001_r_000002_0 on tt2");
+
+
+    //Now we have reached the max capacity limit for default ,
+    //no further tasks would be assigned to this queue.
+    checkMultipleAssignment(
+      "tt3", null,
+      null);
   }
   
   // test if the queue reflects the changes
@@ -1295,6 +1256,27 @@ public class TestCapacityScheduler exten
     assertEquals(18.75f, resConf.getCapacity("q4"));
   }
 
+  public void testCapacityAllocFailureWithLowerMaxCapacity()
+    throws Exception {
+    String[] qs = {"default", "q1"};
+    taskTrackerManager.addQueues(qs);
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 50.0f, true, 50));
+    queues.add(new FakeQueueInfo("q1", -1.0f, true, 50));
+    resConf.setFakeQueues(queues);
+    resConf.setMaxCapacity("q1", 40.0f);
+    scheduler.setResourceManagerConf(resConf);
+    try {
+      scheduler.start();
+      fail("Scheduler start should fail ");
+    } catch (IllegalStateException ise) {
+      assertEquals(
+        ise.getMessage(),
+        " Allocated capacity of " + 50.0f + " to unconfigured queue " +
+          "q1" + " is greater than maximum Capacity " + 40.0f);
+    }
+  }
+
   // Tests how capacity is computed and assignment of tasks done
   // on the basis of the capacity.
   public void testCapacityBasedAllocation() throws Exception {
@@ -1377,26 +1359,26 @@ public class TestCapacityScheduler exten
   }
 
   /**
-   * Creates a queue with max task limit of 2
+   * Creates a queue with max capacity  of 50%
    * submit 1 job in the queue which is high ram(2 slots) . As 2 slots are
    * given to high ram job and are reserved , no other tasks are accepted .
    *
    * @throws IOException
    */
-  public void testHighMemoryBlockingWithMaxLimit()
+  public void testHighMemoryBlockingWithMaxCapacity()
       throws IOException {
 
-    // 2 map and 1 reduce slots
-    taskTrackerManager = new FakeTaskTrackerManager(2, 2, 1);
+    taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
 
     taskTrackerManager.addQueues(new String[] { "defaultXYZ" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("defaultXYZ", 100.0f, true, 25));
+    queues.add(new FakeQueueInfo("defaultXYZ", 25.0f, true, 50));
     resConf.setFakeQueues(queues);
-    resConf.setMaxMapCap("defaultXYZ",2);
+
+    //defaultXYZ can go up to 2 map and 2 reduce slots
+    resConf.setMaxCapacity("defaultXYZ", 50.0f);
+
     scheduler.setTaskTrackerManager(taskTrackerManager);
-    // enabled memory-based scheduling
-    // Normal job in the cluster would be 1GB maps/reduces
     scheduler.getConf().setLong(
         JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
         2 * 1024);
@@ -1404,74 +1386,92 @@ public class TestCapacityScheduler exten
         JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.getConf().setLong(
         JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
-        1 * 1024);
+        2 * 1024);
     scheduler.getConf().setLong(
         JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
+    scheduler.setAssignMultipleTasks(true);
 
-    // The situation :  Submit 2 jobs with high memory map task
-    //Set the max limit for queue to 2 ,
-    // try submitting more map tasks to the queue , it should not happen
-
-    LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
-        + "2 map tasks");
     JobConf jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(2 * 1024);
-    jConf.setMemoryForReduceTask(0);
+    jConf.setMemoryForReduceTask(1 * 1024);
     jConf.setNumMapTasks(2);
-    jConf.setNumReduceTasks(0);
+    jConf.setNumReduceTasks(1);
     jConf.setQueueName("defaultXYZ");
     jConf.setUser("u1");
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
 
-    LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
-        + "2 map/red tasks");
     jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(1 * 1024);
-    jConf.setMemoryForReduceTask(1 * 1024);
-    jConf.setNumMapTasks(2);
+    jConf.setMemoryForReduceTask(2 * 1024);
+    jConf.setNumMapTasks(1);
     jConf.setNumReduceTasks(2);
     jConf.setQueueName("defaultXYZ");
     jConf.setUser("u1");
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
 
-    // first, a map from j1 will run this is a high memory job so it would
-    // occupy the 2 slots
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
 
-    checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+    //high ram map from job 1 and normal reduce task from job 1
+    List<Task> tasks = checkMultipleAssignment(
+      "tt1", "attempt_test_0001_m_000001_0 on tt1",
+      "attempt_test_0001_r_000001_0 on tt1");
 
-    // at this point, the scheduler tries to schedule another map from j1.
-    // there isn't enough space. The second job's reduce should be scheduled.
-    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
-    
-    checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
+    checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1, 2, 200.0f,1,0);
+    checkOccupiedSlots("defaultXYZ", TaskType.REDUCE, 1, 1, 100.0f,0,2);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
 
-    //at this point , the scheduler tries to schedule another map from j2 for
-    //another task tracker.
-    // This should not happen as all the map slots are taken
-    //by the first task itself.hence reduce task from the second job is given
-
-    checkAssignment("tt2","attempt_test_0002_r_000002_0 on tt2");
+    //we have reached the maximum limit for map, so no more map tasks.
+    //we have used 1 reduce already and 1 more reduce slot is left for the
+    //before we reach maxcapacity for reduces.
+    // But current 1 slot + 2 slots for high ram reduce would
+    //mean we are crossing the maxium capacity.hence nothing would be assigned
+    //in this call
+    checkMultipleAssignment("tt2",null,null);
+
+    //complete the high ram job on tt1.
+    for (Task task : tasks) {
+      taskTrackerManager.finishTask(
+        "tt1", task.getTaskID().toString(),
+        job1);
+    }
+
+    //At this point we have 1 high ram map and 1 high ram reduce.
+    List<Task> t2 = checkMultipleAssignment(
+      "tt2", "attempt_test_0001_m_000002_0 on tt2",
+      "attempt_test_0002_r_000001_0 on tt2");
+
+    checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1, 2, 200.0f,1,0);
+    checkOccupiedSlots("defaultXYZ", TaskType.REDUCE, 1, 2, 200.0f,0,2);
+    checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 2 * 1024L);
+
+    //complete the high ram job on tt1.
+    for (Task task : t2) {
+      taskTrackerManager.finishTask(
+        "tt2", task.getTaskID().toString(),
+        job2);
+    }
+
+    //1st map & 2nd reduce from job2
+    checkMultipleAssignment(
+      "tt2", "attempt_test_0002_m_000001_0 on tt2",
+      "attempt_test_0002_r_000002_0 on tt2");
   }
 
   /**
    *   test if user limits automatically adjust to max map or reduce limit
    */
-  public void testUserLimitsWithMaxLimits() throws Exception {
-    setUp(4, 4, 4);
+  public void testUserLimitsWithMaxCapacities() throws Exception {
+    setUp(2, 2, 2);
     // set up some queues
     String[] qs = {"default"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
+    queues.add(new FakeQueueInfo("default", 50.0f, true, 50));
     resConf.setFakeQueues(queues);
-    resConf.setMaxMapCap("default", 2);
-    resConf.setMaxReduceCap("default", 2);
+    resConf.setMaxCapacity("default", 75.0f);
     scheduler.setResourceManagerConf(resConf);
+    scheduler.setAssignMultipleTasks(true);
     scheduler.start();
 
     // submit a job
@@ -1480,37 +1480,27 @@ public class TestCapacityScheduler exten
     FakeJobInProgress fjob2 =
       submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u2");
 
-    // for queue 'default', the capacity for maps is 2.
-    // But the max map limit is 2
-    // hence user should be getting not more than 1 as it is the 50%.
-    Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-
-    //Now we should get the task from the other job. As the
-    //first user has reached his max map limit.
-    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
-
-    //Now we are done with map limit , now if we ask for task we should
-    // get reduce from 1st job
-    checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
-    // Now we're at full capacity for maps. 1 done with reduces for job 1 so
-    // now we should get 1 reduces for job 2
-    Task t4 = checkAssignment("tt4", "attempt_test_0002_r_000001_0 on tt4");
-
-    taskTrackerManager.finishTask(
-      "tt1", t1.getTaskID().toString(),
-      fjob1);
-
-    //tt1 completed the task so we have 1 map slot for u1
-    // we are assigning the 2nd map task from fjob1
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-
-    taskTrackerManager.finishTask(
-      "tt4", t4.getTaskID().toString(),
-      fjob2);
-    //tt4 completed the task , so we have 1 reduce slot for u2
-    //we are assigning the 2nd reduce from fjob2
-    checkAssignment("tt4", "attempt_test_0002_r_000002_0 on tt4");
+    // for queue 'default', maxCapacity for map and reduce is 3.
+    // initial user limit for 50% assuming there are 2 users/queue is.
+    //  1 map and 1 reduce.
+    // after max capacity it is 1.5 each.
+
+    //first job would be given 1 job each.
+    List<Task> t1 = this.checkMultipleAssignment(
+      "tt1", "attempt_test_0001_m_000001_0 on tt1",
+      "attempt_test_0001_r_000001_0 on tt1");
+
+    //for user u1 we have reached the limit. that is 1 job.
+    //1 more map and reduce tasks.
+    List<Task> t2 = this.checkMultipleAssignment(
+      "tt1", "attempt_test_0002_m_000001_0 on tt1",
+      "attempt_test_0002_r_000001_0 on tt1");
+
+    t1 = this.checkMultipleAssignment(
+      "tt2", "attempt_test_0001_m_000002_0 on tt2",
+      "attempt_test_0001_r_000002_0 on tt2");
 
+    t1 = this.checkMultipleAssignment("tt2", null,null);
   }
 
 
@@ -3085,20 +3075,22 @@ public class TestCapacityScheduler exten
   private void checkMemReservedForTasksOnTT(String taskTracker,
       Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) {
     Long observedMemForMapsOnTT =
-        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker).getStatus(),
+      scheduler.memoryMatcher.getMemReservedForTasks(
+        tracker(taskTracker).getStatus(),
             TaskType.MAP);
     Long observedMemForReducesOnTT =
-        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker).getStatus(),
+      scheduler.memoryMatcher.getMemReservedForTasks(
+        tracker(taskTracker).getStatus(),
             TaskType.REDUCE);
     if (expectedMemForMapsOnTT == null) {
-      assertTrue(observedMemForMapsOnTT == null);
+      assertEquals(observedMemForMapsOnTT, null);
     } else {
-      assertTrue(observedMemForMapsOnTT.equals(expectedMemForMapsOnTT));
+      assertEquals(observedMemForMapsOnTT, (expectedMemForMapsOnTT));
     }
     if (expectedMemForReducesOnTT == null) {
-      assertTrue(observedMemForReducesOnTT == null);
+      assertEquals(observedMemForReducesOnTT, null);
     } else {
-      assertTrue(observedMemForReducesOnTT.equals(expectedMemForReducesOnTT));
+      assertEquals(observedMemForReducesOnTT, (expectedMemForReducesOnTT));
     }
   }
 
@@ -3187,4 +3179,48 @@ public class TestCapacityScheduler exten
     assertEquals(scheduler.getLimitMaxMemForMapSlot(),3);
     assertEquals(scheduler.getLimitMaxMemForReduceSlot(),3);
   }
+
+  /**
+   * Checks for multiple assignment.
+   *
+   * @param taskTrackerName
+   * @param mapAttempt
+   * @param reduceAttempt
+   * @return
+   * @throws IOException
+   */
+  private List<Task> checkMultipleAssignment(
+    String taskTrackerName, String mapAttempt, String reduceAttempt)
+    throws IOException {
+    List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
+    LOG.info(
+      " mapAttempt " + mapAttempt + " reduceAttempt " + reduceAttempt +
+        " assignTasks result " + tasks);
+
+    if (tasks == null || tasks.isEmpty()) {
+      if (mapAttempt != null || reduceAttempt != null ) {
+        fail(
+          " improper attempt " + tasks + " expected attempts are  map : " +
+            mapAttempt + " reduce : " + reduceAttempt);
+      } else {
+        return tasks;
+      }
+    }
+    
+    if (tasks.size() == 1 && (mapAttempt != null && reduceAttempt != null)) {
+      fail(
+        " improper attempt " + tasks + " expected attempts are  map : " +
+          mapAttempt + " reduce : " + reduceAttempt);
+    }
+    for (Task task : tasks) {
+      if (task.toString().contains("_m_")) {
+        assertEquals(task.toString(), mapAttempt);
+      }
+
+      if (task.toString().contains("_r")) {
+        assertEquals(task.toString(), reduceAttempt);
+      }
+    }
+    return tasks;
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java?rev=1077031&r1=1077030&r2=1077031&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java Fri Mar  4 03:31:50 2011
@@ -278,6 +278,26 @@ public class TestCapacitySchedulerConf e
     }
   }
   
+  public void testInvalidMaxCapacity() throws IOException {
+    openFile();
+    startConfig();
+    writeProperty(
+      "mapred.capacity-scheduler.queue.default.capacity", "70");
+    writeProperty(
+      "mapred.capacity-scheduler.queue.default.maximum-capacity", "50");
+    endConfig();
+    testConf = new CapacitySchedulerConf(new Path(testConfFile));
+
+    try {
+      testConf.getMaxCapacity("default");
+      fail(" getMaxCapacity worked " + testConf.getCapacity("default"));
+    } catch (IllegalArgumentException e) {
+      assertEquals(
+        CapacitySchedulerConf.MAX_CAPACITY_PROPERTY + " 50.0"+
+          " for a queue should be greater than or equal to capacity ", e.getMessage());
+    }
+  }
+  
   public void testInitializationPollerProperties() 
     throws Exception {
     /*

Modified: hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml?rev=1077031&r1=1077030&r2=1077031&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml Fri Mar  4 03:31:50 2011
@@ -204,36 +204,28 @@
           	users, no user can use more than 25% of the queue's resources. A 
           	value of 100 implies no user limits are imposed.</td>
           </tr>
-          <tr><td>mapred.capacity-scheduler.queue.&lt;queue-name&gt;.max.map.slots</td>
+          <tr><td>mapred.capacity-scheduler.queue.&lt;queue-name&gt;.maximum-capacity</td>
           	<td>
-		    This value is the maximum max slots that can be used in a
-		    queue at any point of time. So for example assuming above config value
-		    is 100 , not more than 100 tasks would be in the queue at any point of
-		    time, assuming each task takes one slot.
+                  maximum-capacity defines a limit beyond which a queue cannot
+                  use the capacity of the cluster.This provides a means to limit
+                  how much excess capacity a queue can use. By default, there
+                  is no limit.
+                  The maximum-capacity of a queue can only be greater than or
+                  equal to its minimum capacity.
+                  Default value of -1 implies a queue can use complete capacity
+                  of the cluster.
 
-		    Default value of -1 would disable this capping feature
+                  This property could be to curtail certain jobs which are long
+                  running in nature from occupying more than a certain
+                  percentage of the cluster, which in the absence of
+                  pre-emption, could lead to capacity guarantees of other queues
+                  being affected.
 
-		    Typically the queue capacity should be equal to this limit.
-		    If queue capacity is more than this limit, excess capacity will be
-		    used by the other queues. If queue capacity is less than the above
-		    limit , then the limit would be the queue capacity - as in the current
-		    implementation
-                </td>
-          </tr>
-          <tr><td>mapred.capacity-scheduler.queue.&lt;queue-name&gt;.max.reduce.slots</td>
-          	<td>
-		    This value is the maximum reduce slots that can be used in a
-		    queue at any point of time. So for example assuming above config value
-		    is 100 , not more than 100 tasks would be in the queue at any point of
-		    time, assuming each task takes one slot.
-
-		    Default value of -1 would disable this capping feature
-
-		    Typically the queue capacity should be equal to this limit.
-		    If queue capacity is more than this limit, excess capacity will be
-		    used by the other queues. If queue capacity is less than the above
-		    limit , then the limit would be the queue capacity - as in the current
-		    implementation
+                  One important thing to note is that maximum-capacity is a
+                  percentage , so based on the cluster's capacity
+                  it would change. So if large no of nodes or racks get added
+                  to the cluster , maximum Capacity in
+                  absolute terms would increase accordingly.
                 </td>
           </tr>
         </table>



Mime
View raw message