hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r771607 [1/2] - in /hadoop/core/trunk: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
Date Tue, 05 May 2009 07:30:20 GMT
Author: yhemanth
Date: Tue May  5 07:30:20 2009
New Revision: 771607

URL: http://svn.apache.org/viewvc?rev=771607&view=rev
Log:
HADOOP-5726. Remove pre-emption from capacity scheduler code base. Contributed by Rahul Kumar
Singh.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/capacity-scheduler.xml.template
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=771607&r1=771606&r2=771607&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue May  5 07:30:20 2009
@@ -527,6 +527,9 @@
 
   INCOMPATIBLE CHANGES
 
+    HADOOP-5726. Remove pre-emption from capacity scheduler code base.
+    (Rahul Kumar Singh via yhemanth)
+
   NEW FEATURES
 
   IMPROVEMENTS

Modified: hadoop/core/trunk/conf/capacity-scheduler.xml.template
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/capacity-scheduler.xml.template?rev=771607&r1=771606&r2=771607&view=diff
==============================================================================
--- hadoop/core/trunk/conf/capacity-scheduler.xml.template (original)
+++ hadoop/core/trunk/conf/capacity-scheduler.xml.template Tue May  5 07:30:20 2009
@@ -8,22 +8,14 @@
 <configuration>
 
   <property>
-    <name>mapred.capacity-scheduler.queue.default.guaranteed-capacity</name>
+    <name>mapred.capacity-scheduler.queue.default.capacity</name>
     <value>100</value>
     <description>Percentage of the number of slots in the cluster that are
-      guaranteed to be available for jobs in this queue.
+      to be available for jobs in this queue.
     </description>    
   </property>
   
   <property>
-    <name>mapred.capacity-scheduler.queue.default.reclaim-time-limit</name>
-    <value>300</value>
-    <description>The amount of time, in seconds, before which 
-      resources distributed to other queues will be reclaimed.
-    </description>
-  </property>
-
-  <property>
     <name>mapred.capacity-scheduler.queue.default.supports-priority</name>
     <value>false</value>
     <description>If true, priorities of jobs will be taken into 
@@ -54,29 +46,10 @@
     </description>
   </property>
   
-  
-  <property>
-    <name>mapred.capacity-scheduler.reclaimCapacity.interval</name>
-    <value>5</value>
-    <description>The time interval, in seconds, between which the scheduler
-     periodically determines whether capacity needs to be reclaimed for 
-     any queue.
-    </description>
-  </property>
-  
   <!-- The default configuration settings for the capacity task scheduler -->
   <!-- The default values would be applied to all the queues which don't have -->
   <!-- the appropriate property for the particular queue -->
   <property>
-    <name>mapred.capacity-scheduler.default-reclaim-time-limit</name>
-    <value>300</value>
-    <description>The amount of time, in seconds, before which 
-    resources distributed to other queues will be reclaimed by default
-    in a job queue.
-    </description>
-  </property>
-  
-  <property>
     <name>mapred.capacity-scheduler.default-supports-priority</name>
     <value>false</value>
     <description>If true, priorities of jobs will be taken into 

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=771607&r1=771606&r2=771607&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
(original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
Tue May  5 07:30:20 2009
@@ -34,8 +34,6 @@
   /** Default file name from which the resource manager configuration is read. */ 
   public static final String SCHEDULER_CONF_FILE = "capacity-scheduler.xml";
   
-  private int defaultReclaimTime;
-  
   private int defaultUlimitMinimum;
   
   private boolean defaultSupportPriority;
@@ -117,8 +115,6 @@
    * which is used by the Capacity Scheduler.
    */
   private void initializeDefaults() {
-    defaultReclaimTime = rmConf.getInt(
-        "mapred.capacity-scheduler.default-reclaim-time-limit",300);
     defaultUlimitMinimum = rmConf.getInt(
         "mapred.capacity-scheduler.default-minimum-user-limit-percent", 100);
     defaultSupportPriority = rmConf.getBoolean(
@@ -129,33 +125,33 @@
   }
   
   /**
-   * Get the guaranteed percentage of the cluster for the specified queue.
+   * Get the percentage of the cluster for the specified queue.
    * 
-   * This method defaults to configured default Guaranteed Capacity if
+   * This method defaults to configured default Capacity if
    * no value is specified in the configuration for this queue. 
    * If the configured capacity is negative value or greater than 100 an
    * {@link IllegalArgumentException} is thrown.
    * 
-   * If default Guaranteed capacity is not configured for a queue, then
+   * If default capacity is not configured for a queue, then
    * system allocates capacity based on what is free at the time of 
    * capacity scheduler start
    * 
    * 
    * @param queue name of the queue
-   * @return guaranteed percent of the cluster for the queue.
+   * @return percent of the cluster for the queue.
    */
-  public float getGuaranteedCapacity(String queue) {
-    //Check done in order to return default GC which can be negative
-    //In case of both GC and default GC not configured.
+  public float getCapacity(String queue) {
+    //Check done in order to return default capacity which can be negative
+    //In case of both capacity and default capacity not configured.
     //Last check is if the configuration is specified and is marked as
     //negative we throw exception
     String raw = rmConf.getRaw(toFullPropertyName(queue, 
-        "guaranteed-capacity"));
+        "capacity"));
     if(raw == null) {
       return -1;
     }
     float result = rmConf.getFloat(toFullPropertyName(queue, 
-                                   "guaranteed-capacity"), 
+                                   "capacity"), 
                                    -1);
     if (result < 0.0 || result > 100.0) {
       throw new IllegalArgumentException("Illegal capacity for queue " + queue +
@@ -165,53 +161,13 @@
   }
   
   /**
-   * Sets the Guaranteed capacity of the given queue.
-   * 
-   * @param queue name of the queue
-   * @param gc guaranteed percent of the cluster for the queue.
-   */
-  public void setGuaranteedCapacity(String queue,float gc) {
-    rmConf.setFloat(toFullPropertyName(queue, "guaranteed-capacity"),gc);
-  }
-  
-  
-  /**
-   * Get the amount of time before which redistributed resources must be
-   * reclaimed for the specified queue.
-   * 
-   * The resource manager distributes spare capacity from a free queue
-   * to ones which are in need for more resources. However, if a job 
-   * submitted to the first queue requires back the resources, they must
-   * be reclaimed within the specified configuration time limit.
-   * 
-   * This method defaults to configured default reclaim time limit if
-   * no value is specified in the configuration for this queue.
-   * 
-   * Throws an {@link IllegalArgumentException} when invalid value is 
-   * configured.
+   * Sets the capacity of the given queue.
    * 
    * @param queue name of the queue
-   * @return reclaim time limit for this queue.
-   */
-  public int getReclaimTimeLimit(String queue) {
-    int reclaimTimeLimit = rmConf.getInt(toFullPropertyName(queue, "reclaim-time-limit"),

-        defaultReclaimTime);
-    if(reclaimTimeLimit <= 0) {
-      throw new IllegalArgumentException("Invalid reclaim time limit : " 
-          + reclaimTimeLimit + " for queue : " + queue);
-    }
-    return reclaimTimeLimit;
-  }
-  
-  /**
-   * Set the amount of time before which redistributed resources must be
-   * reclaimed for the specified queue.
-   * @param queue Name of the queue
-   * @param value Amount of time before which the redistributed resources
-   * must be retained.
+   * @param gc percent of the cluster for the queue.
    */
-  public void setReclaimTimeLimit(String queue, int value) {
-    rmConf.setInt(toFullPropertyName(queue, "reclaim-time-limit"), value);
+  public void setCapacity(String queue,float gc) {
+    rmConf.setFloat(toFullPropertyName(queue, "capacity"),gc);
   }
   
   /**
@@ -435,30 +391,4 @@
   public void setDefaultPercentOfPmemInVmem(float value) {
     rmConf.setFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY, value);
   }
-  
-  /**
-   * Gets the reclaim capacity thread interval.
-   * 
-   * @return reclaim capacity interval
-   */
-
-  public long getReclaimCapacityInterval() {
-    long reclaimCapacityInterval = 
-      rmConf.getLong("mapred.capacity-scheduler.reclaimCapacity.interval", 5);
-    
-    if(reclaimCapacityInterval <= 0) {
-      throw new IllegalArgumentException("Invalid reclaim capacity " +
-      		"interval, should be greater than zero");
-    }
-    return reclaimCapacityInterval;
-  }
-  /**
-   * Sets the reclaim capacity thread interval.
-   * 
-   * @param value
-   */
-  public void setReclaimCapacityInterval(long value) {
-    rmConf.setLong("mapred.capacity-scheduler.reclaimCapacity.interval", 
-        value);
-  }
 }

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=771607&r1=771606&r2=771607&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Tue May  5 07:30:20 2009
@@ -42,15 +42,12 @@
  * and provides a HOD-less way to share large clusters. This scheduler 
  * provides the following features: 
  *  * support for queues, where a job is submitted to a queue. 
- *  * Queues are guaranteed a fraction of the capacity of the grid (their 
- *  'guaranteed capacity') in the sense that a certain capacity of resources 
+ *  * Queues are assigned a fraction of the capacity of the grid (their
+ *  'capacity') in the sense that a certain capacity of resources 
  *  will be at their disposal. All jobs submitted to the queues of an Org 
- *  will have access to the capacity guaranteed to the Org.
- *  * Free resources can be allocated to any queue beyond its guaranteed 
- *  capacity. These excess allocated resources can be reclaimed and made 
- *  available to another queue in order to meet its capacity guarantee.
- *  * The scheduler guarantees that excess resources taken from a queue will 
- *  be restored to it within N minutes of its need for them.
+ *  will have access to the capacity to the Org.
+ *  * Free resources can be allocated to any queue beyond its 
+ *  capacity.
  *  * Queues optionally support job priorities (disabled by default). 
  *  * Within a queue, jobs with higher priority will have access to the 
  *  queue's resources before jobs with lower priority. However, once a job 
@@ -62,42 +59,11 @@
  */
 class CapacityTaskScheduler extends TaskScheduler {
   
-  /** 
-   * For keeping track of reclaimed capacity. 
-   * Whenever slots need to be reclaimed, we create one of these objects. 
-   * As the queue gets slots, the amount to reclaim gets decremented. if 
-   * we haven't reclaimed enough within a certain time, we need to kill 
-   * tasks. This object 'expires' either if all resources are reclaimed
-   * before the deadline, or the deadline passes . 
-   */
-  private static class ReclaimedResource {
-    // how much resource to reclaim
-    public int originalAmount;
-    // how much is to be reclaimed currently
-    public int currentAmount;
-    // the time, in millisecs, when this object expires. 
-    // This time is equal to the time when the object was created, plus
-    // the reclaim-time SLA for the queue.  
-    public long whenToExpire;
-    // we also keep track of when to kill tasks, in millisecs. This is a 
-    // fraction of 'whenToExpire', but we store it here so we don't 
-    // recompute it every time. 
-    public long whenToKill;
-    
-    public ReclaimedResource(int amount, long expiryTime, 
-        long whenToKill) {
-      this.originalAmount = amount;
-      this.currentAmount = amount;
-      this.whenToExpire = expiryTime;
-      this.whenToKill = whenToKill;
-    }
-  }
-
   /***********************************************************************
    * Keeping track of scheduling information for queues
    * 
    * We need to maintain scheduling information relevant to a queue (its 
-   * name, guaranteed capacity, etc), along with information specific to 
+   * name, capacity, etc), along with information specific to 
    * each kind of task, Map or Reduce (num of running tasks, pending 
    * tasks etc). 
    * 
@@ -115,58 +81,18 @@
      * the actual gc, which depends on how many slots are available
      * in the cluster at any given time. 
      */
-    int guaranteedCapacity = 0;
+    int capacity = 0;
     // number of running tasks
     int numRunningTasks = 0;
-    // number of pending tasks
-    int numPendingTasks = 0;
     /** for each user, we need to keep track of number of running tasks */
     Map<String, Integer> numRunningTasksByUser = 
       new HashMap<String, Integer>();
     
     /**
-     * We need to keep track of resources to reclaim. 
-     * Whenever a queue is under capacity and has tasks pending, we offer it 
-     * an SLA that gives it free slots equal to or greater than the gap in 
-     * its capacity, within a period of time (reclaimTime). 
-     * To do this, we periodically check if queues need to reclaim capacity. 
-     * If they do, we create a ResourceReclaim object. We also periodically
-     * check if a queue has received enough free slots within, say, 80% of 
-     * its reclaimTime. If not, we kill enough tasks to make up the 
-     * difference. 
-     * We keep two queues of ResourceReclaim objects. when an object is 
-     * created, it is placed in one queue. Once we kill tasks to recover 
-     * resources for that object, it is placed in an expiry queue. we need
-     * to do this to prevent creating spurious ResourceReclaim objects. We 
-     * keep a count of total resources that are being reclaimed. This count 
-     * is decremented when an object expires. 
-     */
-    
-    /**
-     * the list of resources to reclaim. This list is always sorted so that
-     * resources that need to be reclaimed sooner occur earlier in the list.
-     */
-    LinkedList<ReclaimedResource> reclaimList = 
-      new LinkedList<ReclaimedResource>();
-    /**
-     * the list of resources to expire. This list is always sorted so that
-     * resources that need to be expired sooner occur earlier in the list.
-     */
-    LinkedList<ReclaimedResource> reclaimExpireList = 
-      new LinkedList<ReclaimedResource>();
-    /** 
-     * sum of all resources that are being reclaimed. 
-     * We keep this to prevent unnecessary ReclaimResource objects from being
-     * created.  
-     */
-    int numReclaimedResources = 0;
-    
-    /**
      * reset the variables associated with tasks
      */
     void resetTaskVars() {
       numRunningTasks = 0;
-      numPendingTasks = 0;
       for (String s: numRunningTasksByUser.keySet()) {
         numRunningTasksByUser.put(s, 0);
       }
@@ -176,11 +102,11 @@
      * return information about the tasks
      */
     public String toString(){
-      float runningTasksAsPercent = guaranteedCapacity!= 0 ? 
-          ((float)numRunningTasks * 100/guaranteedCapacity):0;
+      float runningTasksAsPercent = capacity!= 0 ?
+          ((float)numRunningTasks * 100/capacity):0;
       StringBuffer sb = new StringBuffer();
-      sb.append("Guaranteed Capacity: " + guaranteedCapacity + "\n");
-      sb.append(String.format("Running tasks: %.1f%% of Guaranteed Capacity\n",
+      sb.append("Capacity: " + capacity + "\n");
+      sb.append(String.format("Running tasks: %.1f%% of Capacity\n",
           runningTasksAsPercent));
       // include info on active users
       if (numRunningTasks != 0) {
@@ -202,8 +128,8 @@
   private static class QueueSchedulingInfo {
     String queueName;
 
-    /** guaranteed capacity(%) is set in the config */ 
-    float guaranteedCapacityPercent = 0;
+    /** capacity(%) is set in the config */ 
+    float capacityPercent = 0;
     
     /** 
      * to handle user limits, we need to know how many users have jobs in 
@@ -215,13 +141,6 @@
     int ulMin;
     
     /**
-     * reclaim time limit (in msec). This time represents the SLA we offer 
-     * a queue - a queue gets back any lost capacity withing this period 
-     * of time.  
-     */ 
-    long reclaimTime;
-    
-    /**
      * We keep track of the JobQueuesManager only for reporting purposes 
      * (in toString()). 
      */
@@ -234,11 +153,10 @@
     TaskSchedulingInfo reduceTSI;
     
     public QueueSchedulingInfo(String queueName, float gcPercent, 
-        int ulMin, long reclaimTime, JobQueuesManager jobQueuesManager) {
+        int ulMin, JobQueuesManager jobQueuesManager) {
       this.queueName = new String(queueName);
-      this.guaranteedCapacityPercent = gcPercent;
+      this.capacityPercent = gcPercent;
       this.ulMin = ulMin;
-      this.reclaimTime = reclaimTime;
       this.jobQueuesManager = jobQueuesManager;
       this.mapTSI = new TaskSchedulingInfo();
       this.reduceTSI = new TaskSchedulingInfo();
@@ -253,12 +171,10 @@
       StringBuffer sb = new StringBuffer();
       sb.append("Queue configuration\n");
       //sb.append("Name: " + queueName + "\n");
-      sb.append("Guaranteed Capacity Percentage: ");
-      sb.append(guaranteedCapacityPercent);
+      sb.append("Capacity Percentage: ");
+      sb.append(capacityPercent);
       sb.append("%\n");
       sb.append(String.format("User Limit: %d%s\n",ulMin, "%"));
-      sb.append(String.format("Reclaim Time limit: %s\n", 
-          StringUtils.formatTime(reclaimTime)));
       sb.append(String.format("Priority Supported: %s\n",
           (jobQueuesManager.doesQueueSupportPriorities(queueName))?
               "YES":"NO"));
@@ -300,9 +216,8 @@
     public String toString(){
       // note that we do not call updateQSIObjects() here for performance
       // reasons. This means that the data we print out may be slightly
-      // stale. This data is updated whenever assignTasks() is called, or
-      // whenever the reclaim capacity thread runs, which should be fairly
-      // often. If neither of these happen, the data gets stale. If we see
+      // stale. This data is updated whenever assignTasks() is called
+      // If this doesn't happen, the data gets stale. If we see
       // this often, we may need to detect this situation and call 
       // updateQSIObjects(), or just call it each time. 
       return scheduler.getDisplayInfo(queueName);
@@ -372,14 +287,11 @@
     abstract Task obtainNewTask(TaskTrackerStatus taskTracker, 
         JobInProgress job) throws IOException; 
     abstract int getPendingTasks(JobInProgress job);
-    abstract int killTasksFromJob(JobInProgress job, int tasksToKill);
     abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
 
     /**
      * List of QSIs for assigning tasks.
-     * This list is ordered such that queues that need to reclaim capacity
-     * sooner, come before queues that don't. For queues that don't, they're
-     * ordered by a ratio of (# of running tasks)/Guaranteed capacity, which
+     * Queues are ordered by a ratio of (# of running tasks)/capacity, which
      * indicates how much 'free space' the queue has, or how much it is over
      * capacity. This ordered list is iterated over, when assigning tasks.
      */  
@@ -396,34 +308,15 @@
       public int compare(QueueSchedulingInfo q1, QueueSchedulingInfo q2) {
         TaskSchedulingInfo t1 = getTSI(q1);
         TaskSchedulingInfo t2 = getTSI(q2);
-        // if one queue needs to reclaim something and the other one doesn't, 
-        // the former is first
-        if ((0 == t1.reclaimList.size()) && (0 != t2.reclaimList.size())) {
-          return 1;
-        }
-        else if ((0 != t1.reclaimList.size()) && (0 == t2.reclaimList.size())){
-          return -1;
-        }
-        else if ((0 == t1.reclaimList.size()) && (0 == t2.reclaimList.size())){
-          // neither needs to reclaim. 
-          // look at how much capacity they've filled. Treat a queue with gc=0 
-          // equivalent to a queue running at capacity
-          double r1 = (0 == t1.guaranteedCapacity)? 1.0f: 
-            (double)t1.numRunningTasks/(double)t1.guaranteedCapacity;
-          double r2 = (0 == t2.guaranteedCapacity)? 1.0f:
-            (double)t2.numRunningTasks/(double)t2.guaranteedCapacity;
-          if (r1<r2) return -1;
-          else if (r1>r2) return 1;
-          else return 0;
-        }
-        else {
-          // both have to reclaim. Look at which one needs to reclaim earlier
-          long tm1 = t1.reclaimList.get(0).whenToKill;
-          long tm2 = t2.reclaimList.get(0).whenToKill;
-          if (tm1<tm2) return -1;
-          else if (tm1>tm2) return 1;
-          else return 0;
-        }
+        // look at how much capacity they've filled. Treat a queue with gc=0 
+        // equivalent to a queue running at capacity
+        double r1 = (0 == t1.capacity)? 1.0f:
+          (double)t1.numRunningTasks/(double)t1.capacity;
+        double r2 = (0 == t2.capacity)? 1.0f:
+          (double)t2.numRunningTasks/(double)t2.capacity;
+        if (r1<r2) return -1;
+        else if (r1>r2) return 1;
+        else return 0;
       }
     }
     // subclass for map and reduce comparators
@@ -454,197 +347,19 @@
       Collections.sort(qsiForAssigningTasks, queueComparator);
     }
     
-    /** 
-     * Periodically, we walk through our queues to do the following: 
-     * a. Check if a queue needs to reclaim any resources within a period
-     * of time (because it's running below capacity and more tasks are
-     * waiting)
-     * b. Check if a queue hasn't received enough of the resources it needed
-     * to be reclaimed and thus tasks need to be killed.
-     * The caller is responsible for ensuring that the QSI objects and the 
-     * collections are up-to-date.
-     * 
-     * Make sure that we do not make any calls to scheduler.taskTrackerManager
-     * as this can result in a deadlock (see HADOOP-4977). 
-     */
-    private synchronized void reclaimCapacity(int nextHeartbeatInterval) {
-      int tasksToKill = 0;
-      
-      QueueSchedulingInfo lastQsi = 
-        qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1);
-      TaskSchedulingInfo lastTsi = getTSI(lastQsi);
-      long currentTime = scheduler.clock.getTime();
-      for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
-        TaskSchedulingInfo tsi = getTSI(qsi);
-        if (tsi.guaranteedCapacity <= 0) {
-          // no capacity, hence nothing can be reclaimed.
-          continue;
-        }
-        // is there any resource that needs to be reclaimed? 
-        if ((!tsi.reclaimList.isEmpty()) &&  
-            (tsi.reclaimList.getFirst().whenToKill < 
-              currentTime + CapacityTaskScheduler.RECLAIM_CAPACITY_INTERVAL)) {
-          // make a note of how many tasks to kill to claim resources
-          tasksToKill += tsi.reclaimList.getFirst().currentAmount;
-          // move this to expiry list
-          ReclaimedResource r = tsi.reclaimList.remove();
-          tsi.reclaimExpireList.add(r);
-        }
-        // is there any resource that needs to be expired?
-        if ((!tsi.reclaimExpireList.isEmpty()) && 
-            (tsi.reclaimExpireList.getFirst().whenToExpire <= currentTime)) {
-          ReclaimedResource r = tsi.reclaimExpireList.remove();
-          tsi.numReclaimedResources -= r.originalAmount;
-        }
-        // do we need to reclaim a resource later? 
-        // if no queue is over capacity, there's nothing to reclaim
-        if (lastTsi.numRunningTasks <= lastTsi.guaranteedCapacity) {
-          continue;
-        }
-        if (tsi.numRunningTasks < tsi.guaranteedCapacity) {
-          // usedCap is how much capacity is currently accounted for
-          int usedCap = tsi.numRunningTasks + tsi.numReclaimedResources;
-          // see if we have remaining capacity and if we have enough pending 
-          // tasks to use up remaining capacity
-          if ((usedCap < tsi.guaranteedCapacity) && 
-              ((tsi.numPendingTasks - tsi.numReclaimedResources)>0)) {
-            // create a request for resources to be reclaimed
-            int amt = Math.min((tsi.guaranteedCapacity-usedCap), 
-                (tsi.numPendingTasks - tsi.numReclaimedResources));
-            // create a resource object that needs to be reclaimed some time
-            // in the future
-            long whenToKill = qsi.reclaimTime - 
-              (CapacityTaskScheduler.HEARTBEATS_LEFT_BEFORE_KILLING * 
-                  nextHeartbeatInterval);
-            if (whenToKill < 0) whenToKill = 0;
-            tsi.reclaimList.add(new ReclaimedResource(amt, 
-                currentTime + qsi.reclaimTime, 
-                currentTime + whenToKill));
-            tsi.numReclaimedResources += amt;
-            LOG.debug("Queue " + qsi.queueName + " needs to reclaim " + 
-                amt + " resources");
-          }
-        }
-      }
-      // kill tasks to reclaim capacity
-      if (0 != tasksToKill) {
-        killTasks(tasksToKill);
-      }
-    }
-
-    // kill 'tasksToKill' tasks 
-    private void killTasks(int tasksToKill)
-    {
-      /* 
-       * There are a number of fair ways in which one can figure out how
-       * many tasks to kill from which queue, so that the total number of
-       * tasks killed is equal to 'tasksToKill'.
-       * Maybe the best way is to keep a global ordering of running tasks
-       * and kill the ones that ran last, irrespective of what queue or 
-       * job they belong to. 
-       * What we do here is look at how many tasks is each queue running
-       * over capacity, and use that as a weight to decide how many tasks
-       * to kill from that queue.
-       */ 
-      
-      // first, find out all queues over capacity
-      int loc;
-      for (loc=0; loc<qsiForAssigningTasks.size(); loc++) {
-        QueueSchedulingInfo qsi = qsiForAssigningTasks.get(loc);
-        if (getTSI(qsi).numRunningTasks > getTSI(qsi).guaranteedCapacity) {
-          // all queues from here onwards are running over cap
-          break;
-        }
-      }
-      // if some queue needs to reclaim cap, there must be at least one queue
-      // over cap. But check, just in case. 
-      if (loc == qsiForAssigningTasks.size()) {
-        LOG.warn("In Capacity scheduler, we need to kill " + tasksToKill + 
-            " tasks but there is no queue over capacity.");
-        return;
-      }
-      // calculate how many total tasks are over cap
-      int tasksOverCap = 0;
-      for (int i=loc; i<qsiForAssigningTasks.size(); i++) {
-        QueueSchedulingInfo qsi = qsiForAssigningTasks.get(i);
-        tasksOverCap += 
-          (getTSI(qsi).numRunningTasks - getTSI(qsi).guaranteedCapacity);
-      }
-      // now kill tasks from each queue
-      for (int i=loc; i<qsiForAssigningTasks.size(); i++) {
-        QueueSchedulingInfo qsi = qsiForAssigningTasks.get(i);
-        killTasksFromQueue(qsi, (int)Math.round(
-            ((double)(getTSI(qsi).numRunningTasks - 
-                getTSI(qsi).guaranteedCapacity))*
-            tasksToKill/(double)tasksOverCap));
-      }
-    }
-
-    // kill 'tasksToKill' tasks from queue represented by qsi
-    private void killTasksFromQueue(QueueSchedulingInfo qsi, int tasksToKill) {
-      // we start killing as many tasks as possible from the jobs that started
-      // last. This way, we let long-running jobs complete faster.
-      int tasksKilled = 0;
-      JobInProgress jobs[] = scheduler.jobQueuesManager.
-        getRunningJobQueue(qsi.queueName).toArray(new JobInProgress[0]);
-      for (int i=jobs.length-1; i>=0; i--) {
-        if (jobs[i].getStatus().getRunState() != JobStatus.RUNNING) {
-          continue;
-        }
-        tasksKilled += killTasksFromJob(jobs[i], tasksToKill-tasksKilled);
-        if (tasksKilled >= tasksToKill) break;
-      }
-    }
-   
-    // return the TaskAttemptID of the running task, if any, that has made 
-    // the least progress.
-    TaskAttemptID getRunningTaskWithLeastProgress(TaskInProgress tip) {
-      double leastProgress = 1;
-      TaskAttemptID tID = null;
-      for (Iterator<TaskAttemptID> it = 
-        tip.getActiveTasks().keySet().iterator(); it.hasNext();) {
-        TaskAttemptID taskid = it.next();
-        TaskStatus status = tip.getTaskStatus(taskid);
-        if (status.getRunState() == TaskStatus.State.RUNNING) {
-          if (status.getProgress() < leastProgress) {
-            leastProgress = status.getProgress();
-            tID = taskid;
-          }
-        }
-      }
-      return tID;
-    }
-    
-    // called when a task is allocated to queue represented by qsi. 
-    // update our info about reclaimed resources
-    private synchronized void updateReclaimedResources(QueueSchedulingInfo qsi) {
-      TaskSchedulingInfo tsi = getTSI(qsi);
-      // if we needed to reclaim resources, we have reclaimed one
-      if (tsi.reclaimList.isEmpty()) {
-        return;
-      }
-      ReclaimedResource res = tsi.reclaimList.getFirst();
-      res.currentAmount--;
-      if (0 == res.currentAmount) {
-        // move this resource to the expiry list
-        ReclaimedResource r = tsi.reclaimList.remove();
-        tsi.reclaimExpireList.add(r);
-      }
-    }
-
     private synchronized void updateCollectionOfQSIs() {
       Collections.sort(qsiForAssigningTasks, queueComparator);
     }
 
 
     private boolean isUserOverLimit(String user, QueueSchedulingInfo qsi) {
-      // what is our current capacity? It's GC if we're running below GC. 
-      // If we're running over GC, then its #running plus 1 (which is the 
+      // what is our current capacity? It's capacity if we're running below capacity.
+      // If we're running over capacity, then its #running plus 1 (which is the
       // extra slot we're getting). 
       int currentCapacity;
       TaskSchedulingInfo tsi = getTSI(qsi);
-      if (tsi.numRunningTasks < tsi.guaranteedCapacity) {
-        currentCapacity = tsi.guaranteedCapacity;
+      if (tsi.numRunningTasks < tsi.capacity) {
+        currentCapacity = tsi.capacity;
       }
       else {
         currentCapacity = tsi.numRunningTasks+1;
@@ -760,7 +475,7 @@
       for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
         // we may have queues with gc=0. We shouldn't look at jobs from 
         // these queues
-        if (0 == getTSI(qsi).guaranteedCapacity) {
+        if (0 == getTSI(qsi).capacity) {
           continue;
         }
         TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
@@ -772,8 +487,6 @@
 
         // if we find a task, return
         if (lookUpStatus == TaskLookupResult.LookUpStatus.TASK_FOUND) {
-          // we have a task. Update reclaimed resource info
-          updateReclaimedResources(qsi);
           return tlr;
         }
         // if there was a memory mismatch, return
@@ -795,8 +508,8 @@
         Collection<JobInProgress> runJobs = 
           scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
         s.append(" Queue '" + qsi.queueName + "'(" + this.type + "): run=" + 
-            tsi.numRunningTasks + ", gc=" + tsi.guaranteedCapacity + 
-            ", wait=" + tsi.numPendingTasks + ", run jobs="+ runJobs.size() + 
+            tsi.numRunningTasks + ", gc=" + tsi.capacity
+            + ", run jobs="+ runJobs.size() +
             "*** ");
       }
       LOG.debug(s);
@@ -830,55 +543,7 @@
     int getPendingTasks(JobInProgress job) {
       return job.pendingMaps();
     }
-    int killTasksFromJob(JobInProgress job, int tasksToKill) {
-      /*
-       * We'd like to kill tasks that ran the last, or that have made the
-       * least progress.
-       * Ideally, each job would have a list of tasks, sorted by start 
-       * time or progress. That's a lot of state to keep, however. 
-       * For now, we do something a little different. We first try and kill
-       * non-local tasks, as these can be run anywhere. For each TIP, we 
-       * kill the task that has made the least progress, if the TIP has
-       * more than one active task. 
-       * We then look at tasks in runningMapCache.
-       */
-      int tasksKilled = 0;
-      
-      /* 
-       * For non-local running maps, we 'cheat' a bit. We know that the set
-       * of non-local running maps has an insertion order such that tasks 
-       * that ran last are at the end. So we iterate through the set in 
-       * reverse. This is OK because even if the implementation changes, 
-       * we're still using generic set iteration and are no worse of.
-       */ 
-      TaskInProgress[] tips = 
-        job.getNonLocalRunningMaps().toArray(new TaskInProgress[0]);
-      for (int i=tips.length-1; i>=0; i--) {
-        // pick the tast attempt that has progressed least
-        TaskAttemptID tid = getRunningTaskWithLeastProgress(tips[i]);
-        if (null != tid) {
-          if (tips[i].killTask(tid, false)) {
-            if (++tasksKilled >= tasksToKill) {
-              return tasksKilled;
-            }
-          }
-        }
-      }
-      // now look at other running tasks
-      for (Set<TaskInProgress> s: job.getRunningMapCache().values()) {
-        for (TaskInProgress tip: s) {
-          TaskAttemptID tid = getRunningTaskWithLeastProgress(tip);
-          if (null != tid) {
-            if (tip.killTask(tid, false)) {
-              if (++tasksKilled >= tasksToKill) {
-                return tasksKilled;
-              }
-            }
-          }
-        }
-      }
-      return tasksKilled;
-    }
+
     TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
       return qsi.mapTSI;
     }
@@ -911,30 +576,7 @@
     int getPendingTasks(JobInProgress job) {
       return job.pendingReduces();
     }
-    int killTasksFromJob(JobInProgress job, int tasksToKill) {
-      /* 
-       * For reduces, we 'cheat' a bit. We know that the set
-       * of running reduces has an insertion order such that tasks 
-       * that ran last are at the end. So we iterate through the set in 
-       * reverse. This is OK because even if the implementation changes, 
-       * we're still using generic set iteration and are no worse of.
-       */ 
-      int tasksKilled = 0;
-      TaskInProgress[] tips = 
-        job.getRunningReduces().toArray(new TaskInProgress[0]);
-      for (int i=tips.length-1; i>=0; i--) {
-        // pick the tast attempt that has progressed least
-        TaskAttemptID tid = getRunningTaskWithLeastProgress(tips[i]);
-        if (null != tid) {
-          if (tips[i].killTask(tid, false)) {
-            if (++tasksKilled >= tasksToKill) {
-              return tasksKilled;
-            }
-          }
-        }
-      }
-      return tasksKilled;
-    }
+
     TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
       return qsi.reduceTSI;
     }
@@ -953,12 +595,6 @@
   /** name of the default queue. */ 
   static final String DEFAULT_QUEUE_NAME = "default";
   
-  /** how often does redistribution thread run (in msecs)*/
-  private static long RECLAIM_CAPACITY_INTERVAL;
-  /** we start killing tasks to reclaim capacity when we have so many 
-   * heartbeats left. */
-  private static final int HEARTBEATS_LEFT_BEFORE_KILLING = 3;
-
   static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
   protected JobQueuesManager jobQueuesManager;
   protected CapacitySchedulerConf schedConf;
@@ -966,33 +602,6 @@
   private boolean started = false;
   
   /**
-   * Used to distribute/reclaim excess capacity among queues
-   */ 
-  class ReclaimCapacity implements Runnable {
-    public ReclaimCapacity() {
-    }
-    public void run() {
-      while (true) {
-        try {
-          Thread.sleep(RECLAIM_CAPACITY_INTERVAL);
-          if (stopReclaim) { 
-            break;
-          }
-          reclaimCapacity();
-        } catch (InterruptedException t) {
-          break;
-        } catch (Throwable t) {
-          LOG.error("Error in redistributing capacity:\n" +
-                    StringUtils.stringifyException(t));
-        }
-      }
-    }
-  }
-  private Thread reclaimCapacityThread = null;
-  /** variable to indicate that thread should stop */
-  private boolean stopReclaim = false;
-
-  /**
    * A clock class - can be mocked out for testing.
    */
   static class Clock {
@@ -1067,9 +676,6 @@
 
     initializeMemoryRelatedConf();
     
-    RECLAIM_CAPACITY_INTERVAL = schedConf.getReclaimCapacityInterval();
-    RECLAIM_CAPACITY_INTERVAL *= 1000;
-
     // read queue info from config file
     QueueManager queueManager = taskTrackerManager.getQueueManager();
     Set<String> queues = queueManager.getQueues();
@@ -1078,20 +684,19 @@
       throw new IllegalStateException("System has no queue configured");
     }
 
-    Set<String> queuesWithoutConfiguredGC = new HashSet<String>();
+    Set<String> queuesWithoutConfiguredCapacity = new HashSet<String>();
     float totalCapacity = 0.0f;
     for (String queueName: queues) {
-      float gc = schedConf.getGuaranteedCapacity(queueName); 
+      float gc = schedConf.getCapacity(queueName);
       if(gc == -1.0) {
-        queuesWithoutConfiguredGC.add(queueName);
+        queuesWithoutConfiguredCapacity.add(queueName);
       }else {
         totalCapacity += gc;
       }
       int ulMin = schedConf.getMinimumUserLimitPercent(queueName); 
-      long reclaimTimeLimit = schedConf.getReclaimTimeLimit(queueName) * 1000;
       // create our QSI and add to our hashmap
       QueueSchedulingInfo qsi = new QueueSchedulingInfo(queueName, gc, 
-          ulMin, reclaimTimeLimit, jobQueuesManager);
+                                                    ulMin, jobQueuesManager);
       queueInfoMap.put(queueName, qsi);
 
       // create the queues of job objects
@@ -1105,11 +710,11 @@
     }
     float remainingQuantityToAllocate = 100 - totalCapacity;
     float quantityToAllocate = 
-      remainingQuantityToAllocate/queuesWithoutConfiguredGC.size();
-    for(String queue: queuesWithoutConfiguredGC) {
+      remainingQuantityToAllocate/queuesWithoutConfiguredCapacity.size();
+    for(String queue: queuesWithoutConfiguredCapacity) {
       QueueSchedulingInfo qsi = queueInfoMap.get(queue); 
-      qsi.guaranteedCapacityPercent = quantityToAllocate;
-      schedConf.setGuaranteedCapacity(queue, quantityToAllocate);
+      qsi.capacityPercent = quantityToAllocate;
+      schedConf.setCapacity(queue, quantityToAllocate);
     }    
     
     // check if there's a queue with the default name. If not, we quit.
@@ -1137,19 +742,9 @@
     initializationPoller.setDaemon(true);
     initializationPoller.start();
 
-    // start thread for redistributing capacity if we have more than 
-    // one queue
-    if (queueInfoMap.size() > 1) {
-      this.reclaimCapacityThread = 
-        new Thread(new ReclaimCapacity(),"reclaimCapacity");
-      this.reclaimCapacityThread.start();
-    }
-    else {
-      LOG.info("Only one queue present. Reclaim capacity thread not started.");
-    }
-    
     started = true;
-    LOG.info("Capacity scheduler initialized " + queues.size() + " queues");  }
+    LOG.info("Capacity scheduler initialized " + queues.size() + " queues");  
+  }
   
   /** mostly for testing purposes */
   void setInitializationPoller(JobInitializationPoller p) {
@@ -1163,8 +758,6 @@
       taskTrackerManager.removeJobInProgressListener(
           jobQueuesManager);
     }
-    // tell the reclaim thread to stop
-    stopReclaim = true;
     started = false;
     initializationPoller.terminate();
     super.terminate();
@@ -1176,27 +769,6 @@
   }
 
   /**
-   * Reclaim capacity for both map & reduce tasks. 
-   * Do not make this synchronized, since we call taskTrackerManager 
-   * (see HADOOP-4977). 
-   */
-  void reclaimCapacity() {
-    // get the cluster capacity
-    ClusterStatus c = taskTrackerManager.getClusterStatus();
-    int mapClusterCapacity = c.getMaxMapTasks();
-    int reduceClusterCapacity = c.getMaxReduceTasks();
-    int nextHeartbeatInterval = taskTrackerManager.getNextHeartbeatInterval();
-    // update the QSI objects
-    updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
-    // update the qsi collections, since we depend on their ordering 
-    mapScheduler.updateCollectionOfQSIs();
-    reduceScheduler.updateCollectionOfQSIs();
-    // now, reclaim
-    mapScheduler.reclaimCapacity(nextHeartbeatInterval);
-    reduceScheduler.reclaimCapacity(nextHeartbeatInterval);
-  }
-  
-  /**
    * provided for the test classes
    * lets you update the QSI objects and sorted collections
    */ 
@@ -1216,31 +788,27 @@
    * to make scheduling decisions. For example, we don't need an exact count
    * of numRunningTasks. Once we count upto the grid capacity, any
    * number beyond that will make no difference.
-   * 
-   * The pending task count is only required in reclaim capacity. So 
-   * if the computation becomes expensive, we can add a boolean to 
-   * denote if pending task computation is required or not.
-   * 
+   *
    **/
-  private synchronized void updateQSIObjects(int mapClusterCapacity, 
+  private synchronized void updateQSIObjects(int mapClusterCapacity,
       int reduceClusterCapacity) {
-    // if # of slots have changed since last time, update. 
+    // if # of slots have changed since last time, update.
     // First, compute whether the total number of TT slots have changed
     for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
-      // compute new GCs, if TT slots have changed
+      // compute new capacities, if TT slots have changed
       if (mapClusterCapacity != prevMapClusterCapacity) {
-        qsi.mapTSI.guaranteedCapacity =
-          (int)(qsi.guaranteedCapacityPercent*mapClusterCapacity/100);
+        qsi.mapTSI.capacity =
+          (int)(qsi.capacityPercent*mapClusterCapacity/100);
       }
       if (reduceClusterCapacity != prevReduceClusterCapacity) {
-        qsi.reduceTSI.guaranteedCapacity =
-          (int)(qsi.guaranteedCapacityPercent*reduceClusterCapacity/100);
+        qsi.reduceTSI.capacity =
+          (int)(qsi.capacityPercent*reduceClusterCapacity/100);
       }
       // reset running/pending tasks, tasks per user
       qsi.mapTSI.resetTaskVars();
       qsi.reduceTSI.resetTaskVars();
       // update stats on running jobs
-      for (JobInProgress j: 
+      for (JobInProgress j:
         jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
           continue;
@@ -1249,62 +817,36 @@
         int runningReduces = j.runningReduces();
         qsi.mapTSI.numRunningTasks += runningMaps;
         qsi.reduceTSI.numRunningTasks += runningReduces;
-        Integer i = 
+        Integer i =
           qsi.mapTSI.numRunningTasksByUser.get(j.getProfile().getUser());
-        qsi.mapTSI.numRunningTasksByUser.put(j.getProfile().getUser(), 
+        qsi.mapTSI.numRunningTasksByUser.put(j.getProfile().getUser(),
             i+runningMaps);
         i = qsi.reduceTSI.numRunningTasksByUser.get(j.getProfile().getUser());
-        qsi.reduceTSI.numRunningTasksByUser.put(j.getProfile().getUser(), 
+        qsi.reduceTSI.numRunningTasksByUser.put(j.getProfile().getUser(),
             i+runningReduces);
-        qsi.mapTSI.numPendingTasks += j.pendingMaps();
-        qsi.reduceTSI.numPendingTasks += j.pendingReduces();
         LOG.debug("updateQSI: job " + j.getJobID().toString() + ": run(m) = " +
-            j.runningMaps() + ", run(r) = " + j.runningReduces() + 
-            ", finished(m) = " + j.finishedMaps() + ", finished(r)= " + 
-            j.finishedReduces() + ", failed(m) = " + j.failedMapTasks + 
-            ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " + 
-            j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks 
-            + ", total(m) = " + j.numMapTasks + ", total(r) = " + 
+            j.runningMaps() + ", run(r) = " + j.runningReduces() +
+            ", finished(m) = " + j.finishedMaps() + ", finished(r)= " +
+            j.finishedReduces() + ", failed(m) = " + j.failedMapTasks +
+            ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " +
+            j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks
+            + ", total(m) = " + j.numMapTasks + ", total(r) = " +
             j.numReduceTasks);
-        /* 
+        /*
          * it's fine walking down the entire list of running jobs - there
          * probably will not be many, plus, we may need to go through the
          * list to compute numRunningTasksByUser. If this is expensive, we
          * can keep a list of running jobs per user. Then we only need to
          * consider the first few jobs per user.
-         */ 
-      }
-      
-      //update stats on waiting jobs
-      for(JobInProgress j: jobQueuesManager.getWaitingJobs(qsi.queueName)) {
-        // pending tasks
-        if ((qsi.mapTSI.numPendingTasks > mapClusterCapacity) &&
-            (qsi.reduceTSI.numPendingTasks > reduceClusterCapacity)) {
-          // that's plenty. no need for more computation
-          break;
-        }
-        /*
-         * Consider only the waiting jobs in the job queue. Job queue can
-         * contain:
-         * 1. Jobs which are in running state but not scheduled
-         * (these would also be present in running queue), the pending 
-         * task count of these jobs is computed when scheduler walks
-         * through running job queue.
-         * 2. Jobs which are killed by user, but waiting job initialization
-         * poller to walk through the job queue to clean up killed jobs.
          */
-        if (j.getStatus().getRunState() == JobStatus.PREP) {
-          qsi.mapTSI.numPendingTasks += j.pendingMaps();
-          qsi.reduceTSI.numPendingTasks += j.pendingReduces();
-        }
       }
     }
-    
+
     prevMapClusterCapacity = mapClusterCapacity;
     prevReduceClusterCapacity = reduceClusterCapacity;
   }
 
-  /* 
+  /*
    * The grand plan for assigning a task. 
    * First, decide whether a Map or Reduce task should be given to a TT 
    * (if the TT can accept either). 



Mime
View raw message