hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r786377 [1/2] - in /hadoop/core/branches/HADOOP-4687/mapred/src/contrib: ./ capacity-scheduler/ capacity-scheduler/src/java/org/apache/hadoop/mapred/ capacity-scheduler/src/test/org/apache/hadoop/mapred/ data_join/ dynamic-scheduler/ fairsc...
Date Fri, 19 Jun 2009 05:42:54 GMT
Author: cdouglas
Date: Fri Jun 19 05:42:53 2009
New Revision: 786377

URL: http://svn.apache.org/viewvc?rev=786377&view=rev
Log:
HADOOP-4687. Merge mapred/contrib changes -r 776174:785643

Modified:
    hadoop/core/branches/HADOOP-4687/mapred/src/contrib/build-contrib.xml   (props changed)
    hadoop/core/branches/HADOOP-4687/mapred/src/contrib/build.xml   (contents, props changed)
    hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/   (props changed)
    hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/ivy.xml
    hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
    hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
    hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/core/branches/HADOOP-4687/mapred/src/contrib/data_join/   (props changed)
    hadoop/core/branches/HADOOP-4687/mapred/src/contrib/dynamic-scheduler/   (props changed)
    hadoop/core/branches/HADOOP-4687/mapred/src/contrib/fairscheduler/   (props changed)
    hadoop/core/branches/HADOOP-4687/mapred/src/contrib/index/   (props changed)
    hadoop/core/branches/HADOOP-4687/mapred/src/contrib/mrunit/   (props changed)
    hadoop/core/branches/HADOOP-4687/mapred/src/contrib/sqoop/   (props changed)
    hadoop/core/branches/HADOOP-4687/mapred/src/contrib/streaming/   (props changed)
    hadoop/core/branches/HADOOP-4687/mapred/src/contrib/vaidya/   (props changed)

Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/build-contrib.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/contrib/build-contrib.xml:713112
+/hadoop/core/trunk/src/contrib/build-contrib.xml:776175-786373

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/contrib/build.xml?rev=786377&r1=786376&r2=786377&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/contrib/build.xml (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/contrib/build.xml Fri Jun 19 05:42:53 2009
@@ -54,6 +54,7 @@
       <fileset dir="." includes="fairscheduler/build.xml"/> 
      <fileset dir="." includes="capacity-scheduler/build.xml"/>  
       <fileset dir="." includes="mrunit/build.xml"/> 
+      <fileset dir="." includes="dynamic-scheduler/build.xml"/>
     </subant>
     <available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
     <fail if="testsfailed">Tests failed!</fail>

Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/contrib/build.xml:713112
+/hadoop/core/trunk/src/contrib/build.xml:776175-786373

Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/contrib/capacity-scheduler:713112
+/hadoop/core/trunk/src/contrib/capacity-scheduler:776175-786373

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/ivy.xml?rev=786377&r1=786376&r2=786377&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/ivy.xml (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/ivy.xml Fri Jun 19 05:42:53 2009
@@ -24,7 +24,7 @@
     <artifact conf="master"/>
   </publications>
   <dependencies>
-   <dependency org="commons-cli"
+    <dependency org="commons-cli"
       name="commons-cli"
       rev="${commons-cli.version}"
       conf="common->default"/>

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=786377&r1=786376&r2=786377&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Fri Jun 19 05:42:53 2009
@@ -164,10 +164,10 @@
    * Sets the capacity of the given queue.
    * 
    * @param queue name of the queue
-   * @param gc percent of the cluster for the queue.
+   * @param capacity percent of the cluster for the queue.
    */
-  public void setCapacity(String queue,float gc) {
-    rmConf.setFloat(toFullPropertyName(queue, "capacity"),gc);
+  public void setCapacity(String queue,float capacity) {
+    rmConf.setFloat(toFullPropertyName(queue, "capacity"),capacity);
   }
   
   /**
@@ -351,44 +351,4 @@
     rmConf.setInt(
         "mapred.capacity-scheduler.init-worker-threads", poolSize);
   }
-
-  /**
-   * Get the upper limit on the maximum physical memory that can be specified by
-   * a job.
-   * 
-   * @return upper limit for max pmem for tasks.
-   */
-  public long getLimitMaxPmemForTasks() {
-    return rmConf.getLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY,
-        JobConf.DISABLED_MEMORY_LIMIT);
-  }
-
-  /**
-   * Get the upper limit on the maximum physical memory that can be specified by
-   * a job.
-   * 
-   * @param value
-   */
-  public void setLimitMaxPmemForTasks(long value) {
-    rmConf.setLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY, value);
-  }
-
-  /**
-   * Get cluster-wide default percentage of pmem in vmem.
-   * 
-   * @return cluster-wide default percentage of pmem in vmem.
-   */
-  public float getDefaultPercentOfPmemInVmem() {
-    return rmConf.getFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY,
-        JobConf.DISABLED_MEMORY_LIMIT);
-  }
-
-  /**
-   * Set cluster-wide default percentage of pmem in vmem.
-   * 
-   * @param value
-   */
-  public void setDefaultPercentOfPmemInVmem(float value) {
-    rmConf.setFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY, value);
-  }
 }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=786377&r1=786376&r2=786377&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Jun 19 05:42:53 2009
@@ -24,8 +24,6 @@
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -34,8 +32,6 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
-import org.apache.hadoop.util.StringUtils;
-
 
 /**
  * A {@link TaskScheduler} that implements the requirements in HADOOP-3421
@@ -78,14 +74,20 @@
 
   private static class TaskSchedulingInfo {
     /** 
-     * the actual gc, which depends on how many slots are available
+     * the actual capacity, which depends on how many slots are available
      * in the cluster at any given time. 
      */
     int capacity = 0;
     // number of running tasks
     int numRunningTasks = 0;
-    /** for each user, we need to keep track of number of running tasks */
-    Map<String, Integer> numRunningTasksByUser = 
+    // number of slots occupied by running tasks
+    int numSlotsOccupied = 0;
+
+    /**
+     * for each user, we need to keep track of number of slots occupied by
+     * running tasks
+     */
+    Map<String, Integer> numSlotsOccupiedByUser = 
       new HashMap<String, Integer>();
     
     /**
@@ -93,32 +95,41 @@
      */
     void resetTaskVars() {
       numRunningTasks = 0;
-      for (String s: numRunningTasksByUser.keySet()) {
-        numRunningTasksByUser.put(s, 0);
+      numSlotsOccupied = 0;
+      for (String s: numSlotsOccupiedByUser.keySet()) {
+        numSlotsOccupiedByUser.put(s, Integer.valueOf(0));
       }
     }
 
     /**
      * return information about the tasks
      */
-    public String toString(){
-      float runningTasksAsPercent = capacity!= 0 ?
-          ((float)numRunningTasks * 100/capacity):0;
+    @Override
+    public String toString() {
+      float occupiedSlotsAsPercent =
+          capacity != 0 ? ((float) numSlotsOccupied * 100 / capacity) : 0;
       StringBuffer sb = new StringBuffer();
-      sb.append("Capacity: " + capacity + "\n");
-      sb.append(String.format("Running tasks: %.1f%% of Capacity\n",
-          runningTasksAsPercent));
+      sb.append("Capacity: " + capacity + " slots\n");
+      sb.append(String.format("Used capacity: %d (%.1f%% of Capacity)\n",
+          Integer.valueOf(numSlotsOccupied), Float
+              .valueOf(occupiedSlotsAsPercent)));
+      sb.append(String.format("Running tasks: %d\n", Integer
+          .valueOf(numRunningTasks)));
       // include info on active users
-      if (numRunningTasks != 0) {
+      if (numSlotsOccupied != 0) {
         sb.append("Active users:\n");
-        for (Map.Entry<String, Integer> entry: numRunningTasksByUser.entrySet()) {
+        for (Map.Entry<String, Integer> entry : numSlotsOccupiedByUser
+            .entrySet()) {
           if ((entry.getValue() == null) || (entry.getValue().intValue() <= 0)) {
             // user has no tasks running
             continue;
           }
-          sb.append("User '" + entry.getKey()+ "': ");
-          float p = (float)entry.getValue().intValue()*100/numRunningTasks;
-          sb.append(String.format("%.1f%% of running tasks\n", p));
+          sb.append("User '" + entry.getKey() + "': ");
+          int numSlotsOccupiedByThisUser = entry.getValue().intValue();
+          float p =
+              (float) numSlotsOccupiedByThisUser * 100 / numSlotsOccupied;
+          sb.append(String.format("%d (%.1f%% of used capacity)\n", Long
+              .valueOf(numSlotsOccupiedByThisUser), Float.valueOf(p)));
         }
       }
       return sb.toString();
@@ -152,10 +163,10 @@
     TaskSchedulingInfo mapTSI;
     TaskSchedulingInfo reduceTSI;
     
-    public QueueSchedulingInfo(String queueName, float gcPercent, 
+    public QueueSchedulingInfo(String queueName, float capacityPercent, 
         int ulMin, JobQueuesManager jobQueuesManager) {
       this.queueName = new String(queueName);
-      this.capacityPercent = gcPercent;
+      this.capacityPercent = capacityPercent;
       this.ulMin = ulMin;
       this.jobQueuesManager = jobQueuesManager;
       this.mapTSI = new TaskSchedulingInfo();
@@ -164,13 +175,14 @@
     
     /**
      * return information about the queue
+     * @return a String representing the information about the queue.
      */
+    @Override
     public String toString(){
       // We print out the queue information first, followed by info
       // on map and reduce tasks and job info
       StringBuffer sb = new StringBuffer();
       sb.append("Queue configuration\n");
-      //sb.append("Name: " + queueName + "\n");
       sb.append("Capacity Percentage: ");
       sb.append(capacityPercent);
       sb.append("%\n");
@@ -278,16 +290,29 @@
 
     /** our TaskScheduler object */
     protected CapacityTaskScheduler scheduler;
-    // can be replaced with a global type, if we have one
-    protected static enum TYPE {
-      MAP, REDUCE
-    }
-    protected TYPE type = null;
+    protected CapacityTaskScheduler.TYPE type = null;
 
     abstract Task obtainNewTask(TaskTrackerStatus taskTracker, 
-        JobInProgress job) throws IOException; 
+        JobInProgress job) throws IOException;
+
+    int getSlotsOccupied(JobInProgress job) {
+      return getRunningTasks(job) * getSlotsPerTask(job);
+    }
+
+    abstract int getClusterCapacity();
+    abstract int getSlotsPerTask(JobInProgress job);
+    abstract int getRunningTasks(JobInProgress job);
     abstract int getPendingTasks(JobInProgress job);
     abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
+    /**
+     * To check if job has a speculative task on the particular tracker.
+     * 
+     * @param job job to check for speculative tasks.
+     * @param tts task tracker on which speculative task would run.
+     * @return true if there is a speculative task to run on the tracker.
+     */
+    abstract boolean hasSpeculativeTask(JobInProgress job, 
+        TaskTrackerStatus tts);
 
     /**
      * List of QSIs for assigning tasks.
@@ -308,12 +333,12 @@
       public int compare(QueueSchedulingInfo q1, QueueSchedulingInfo q2) {
         TaskSchedulingInfo t1 = getTSI(q1);
         TaskSchedulingInfo t2 = getTSI(q2);
-        // look at how much capacity they've filled. Treat a queue with gc=0 
-        // equivalent to a queue running at capacity
+        // look at how much capacity they've filled. Treat a queue with
+        // capacity=0 equivalent to a queue running at capacity
         double r1 = (0 == t1.capacity)? 1.0f:
-          (double)t1.numRunningTasks/(double)t1.capacity;
+          (double)t1.numSlotsOccupied/(double)t1.capacity;
         double r2 = (0 == t2.capacity)? 1.0f:
-          (double)t2.numRunningTasks/(double)t2.capacity;
+          (double)t2.numSlotsOccupied/(double)t2.capacity;
         if (r1<r2) return -1;
         else if (r1>r2) return 1;
         else return 0;
@@ -335,7 +360,17 @@
     protected final static ReduceQueueComparator reduceComparator = new ReduceQueueComparator();
     // and this is the comparator to use
     protected QueueComparator queueComparator;
-   
+
+    // Returns queues sorted according to the QueueComparator.
+    // Mainly for testing purposes.
+    String[] getOrderedQueues() {
+      List<String> queues = new ArrayList<String>(qsiForAssigningTasks.size());
+      for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
+        queues.add(qsi.queueName);
+      }
+      return queues.toArray(new String[queues.size()]);
+    }
+
     TaskSchedulingMgr(CapacityTaskScheduler sched) {
       scheduler = sched;
     }
@@ -352,24 +387,26 @@
     }
 
 
-    private boolean isUserOverLimit(String user, QueueSchedulingInfo qsi) {
-      // what is our current capacity? It's capacity if we're running below capacity.
-      // If we're running over capacity, then its #running plus 1 (which is the
-      // extra slot we're getting). 
+    private boolean isUserOverLimit(JobInProgress j, QueueSchedulingInfo qsi) {
+      // what is our current capacity? It is equal to the queue-capacity if
+      // we're running below capacity. If we're running over capacity, then its
+      // #running plus slotPerTask of the job (which is the number of extra
+      // slots we're getting).
       int currentCapacity;
       TaskSchedulingInfo tsi = getTSI(qsi);
-      if (tsi.numRunningTasks < tsi.capacity) {
+      if (tsi.numSlotsOccupied < tsi.capacity) {
         currentCapacity = tsi.capacity;
       }
       else {
-        currentCapacity = tsi.numRunningTasks+1;
+        currentCapacity = tsi.numSlotsOccupied + getSlotsPerTask(j);
       }
       int limit = Math.max((int)(Math.ceil((double)currentCapacity/
           (double)qsi.numJobsByUser.size())), 
           (int)(Math.ceil((double)(qsi.ulMin*currentCapacity)/100.0)));
-      if (tsi.numRunningTasksByUser.get(user) >= limit) {
-        LOG.debug("User " + user + " is over limit, num running tasks = " + 
-            tsi.numRunningTasksByUser.get(user) + ", limit = " + limit);
+      String user = j.getProfile().getUser();
+      if (tsi.numSlotsOccupiedByUser.get(user) >= limit) {
+        LOG.debug("User " + user + " is over limit, num slots occupied = " + 
+            tsi.numSlotsOccupiedByUser.get(user) + ", limit = " + limit);
         return true;
       }
       else {
@@ -398,31 +435,37 @@
           continue;
         }
         // check if the job's user is over limit
-        if (isUserOverLimit(j.getProfile().getUser(), qsi)) {
+        if (isUserOverLimit(j, qsi)) {
           continue;
-        }
-        if (getPendingTasks(j) != 0) {
-          // Not accurate TODO:
-          // check if the job's memory requirements are met
-          if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
-            // We found a suitable job. Get task from it.
-            Task t = obtainNewTask(taskTracker, j);
-            if (t != null) {
-              // we're successful in getting a task
-              return TaskLookupResult.getTaskFoundResult(t);
-            }
+        } 
+        //If this job meets memory requirements. Ask the JobInProgress for
+        //a task to be scheduled on the task tracker.
+        //if we find a job then we pass it on.
+        if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
+            taskTracker)) {
+          // We found a suitable job. Get task from it.
+          Task t = obtainNewTask(taskTracker, j);
+          //if there is a task return it immediately.
+          if (t != null) {
+            // we're successful in getting a task
+            return TaskLookupResult.getTaskFoundResult(t);
+          } else {
+            //skip to the next job in the queue.
+            LOG.debug("Job " + j.getJobID().toString()
+                + " returned no tasks of type " + type);
+            continue;
           }
-          else {
-            // mem requirements not met or could not be computed for this TT
-            // Rather than look at the next job, 
-            // we return nothing to the TT, with the hope that we improve 
-            // chances of finding a suitable TT for this job. This lets us
-            // avoid starving jobs with high mem requirements.         
+        } else {
+          //if memory requirements don't match then we check if the 
+          //job has either pending or speculative task. If the job
+          //has pending or speculative task we block till this job
+          //tasks get scheduled. So that high memory jobs are not starved
+          if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTracker)) {
             return TaskLookupResult.getMemFailedResult();
-          }
-        }
+          } 
+        }//end of memory check block
         // if we're here, this job has no task to run. Look at the next job.
-      }
+      }//end of for loop
 
       // if we're here, we haven't found any task to run among all jobs in 
       // the queue. This could be because there is nothing to run, or that 
@@ -444,24 +487,29 @@
         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
           continue;
         }
-        if (getPendingTasks(j) != 0) {
-          // Not accurate TODO:
-          // check if the job's memory requirements are met
-          if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
-            // We found a suitable job. Get task from it.
-            Task t = obtainNewTask(taskTracker, j);
-            if (t != null) {
-              // we're successful in getting a task
-              return TaskLookupResult.getTaskFoundResult(t);
-            }
+        if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
+            taskTracker)) {
+          // We found a suitable job. Get task from it.
+          Task t = obtainNewTask(taskTracker, j);
+          //if there is a task return it immediately.
+          if (t != null) {
+            // we're successful in getting a task
+            return TaskLookupResult.getTaskFoundResult(t);
+          } else {
+            //skip to the next job in the queue.
+            continue;
           }
-          else {
-            // mem requirements not met. 
+        } else {
+          //if memory requirements don't match then we check if the 
+          //job has either pending or speculative task. If the job
+          //has pending or speculative task we block till this job
+          //tasks get scheduled, so that high memory jobs are not 
+          //starved
+          if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTracker)) {
             return TaskLookupResult.getMemFailedResult();
-          }
-        }
-        // if we're here, this job has no task to run. Look at the next job.
-      }
+          } 
+        }//end of memory check block
+      }//end of for loop
 
       // found nothing for this queue, look at the next one.
       String msg = "Found no task from the queue " + qsi.queueName;
@@ -473,8 +521,11 @@
     // The caller is responsible for ensuring that the QSI objects and the 
     // collections are up-to-date.
     private TaskLookupResult assignTasks(TaskTrackerStatus taskTracker) throws IOException {
+
+      printQSIs();
+
       for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
-        // we may have queues with gc=0. We shouldn't look at jobs from 
+        // we may have queues with capacity=0. We shouldn't look at jobs from 
         // these queues
         if (0 == getTSI(qsi).capacity) {
           continue;
@@ -500,20 +551,23 @@
       // nothing to give
       return TaskLookupResult.getNoTaskFoundResult();
     }
-    
+
     // for debugging.
     private void printQSIs() {
-      StringBuffer s = new StringBuffer();
-      for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
-        TaskSchedulingInfo tsi = getTSI(qsi);
-        Collection<JobInProgress> runJobs = 
-          scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
-        s.append(" Queue '" + qsi.queueName + "'(" + this.type + "): run=" + 
-            tsi.numRunningTasks + ", gc=" + tsi.capacity
-            + ", run jobs="+ runJobs.size() +
-            "*** ");
+      if (LOG.isDebugEnabled()) {
+        StringBuffer s = new StringBuffer();
+        for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
+          TaskSchedulingInfo tsi = getTSI(qsi);
+          Collection<JobInProgress> runJobs =
+              scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
+          s.append(String.format(" Queue '%s'(%s): runningTasks=%d, "
+              + "occupiedSlots=%d, capacity=%d, runJobs=%d", qsi.queueName,
+              this.type, Integer.valueOf(tsi.numRunningTasks), Integer
+                  .valueOf(tsi.numSlotsOccupied), Integer
+                  .valueOf(tsi.capacity), Integer.valueOf(runJobs.size())));
+        }
+        LOG.debug(s);
       }
-      LOG.debug(s);
     }
     
     /**
@@ -543,11 +597,14 @@
    * The scheduling algorithms for map tasks. 
    */
   private static class MapSchedulingMgr extends TaskSchedulingMgr {
-    MapSchedulingMgr(CapacityTaskScheduler dad) {
-      super(dad);
-      type = TaskSchedulingMgr.TYPE.MAP;
+
+    MapSchedulingMgr(CapacityTaskScheduler schedulr) {
+      super(schedulr);
+      type = CapacityTaskScheduler.TYPE.MAP;
       queueComparator = mapComparator;
     }
+
+    @Override
     Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
     throws IOException {
       ClusterStatus clusterStatus = 
@@ -556,31 +613,57 @@
       return job.obtainNewMapTask(taskTracker, numTaskTrackers, 
           scheduler.taskTrackerManager.getNumberOfUniqueHosts());
     }
+
+    @Override
     int getClusterCapacity() {
       return scheduler.taskTrackerManager.getClusterStatus().getMaxMapTasks();
     }
+
+    @Override
     int getRunningTasks(JobInProgress job) {
       return job.runningMaps();
     }
+
+    @Override
     int getPendingTasks(JobInProgress job) {
       return job.pendingMaps();
     }
 
+    @Override
+    int getSlotsPerTask(JobInProgress job) {
+      long myVmem = job.getJobConf().getMemoryForMapTask();
+      return (int) (Math.ceil((float) myVmem
+          / (float) scheduler.getMemSizeForMapSlot()));
+    }
+
+    @Override
     TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
       return qsi.mapTSI;
     }
 
+    @Override
+    boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
+      //Check if job supports speculative map execution first then 
+      //check if job has speculative maps.
+      return (job.getJobConf().getMapSpeculativeExecution())&& (
+          hasSpeculativeTask(job.getMapTasks(), 
+              job.getStatus().mapProgress(), tts));
+    }
+
   }
 
   /**
    * The scheduling algorithms for reduce tasks. 
    */
   private static class ReduceSchedulingMgr extends TaskSchedulingMgr {
-    ReduceSchedulingMgr(CapacityTaskScheduler dad) {
-      super(dad);
-      type = TaskSchedulingMgr.TYPE.REDUCE;
+
+    ReduceSchedulingMgr(CapacityTaskScheduler schedulr) {
+      super(schedulr);
+      type = CapacityTaskScheduler.TYPE.REDUCE;
       queueComparator = reduceComparator;
     }
+
+    @Override
     Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
     throws IOException {
       ClusterStatus clusterStatus = 
@@ -589,19 +672,44 @@
       return job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
           scheduler.taskTrackerManager.getNumberOfUniqueHosts());
     }
+
+    @Override
     int getClusterCapacity() {
-      return scheduler.taskTrackerManager.getClusterStatus().getMaxReduceTasks();
+      return scheduler.taskTrackerManager.getClusterStatus()
+          .getMaxReduceTasks();
     }
+
+    @Override
     int getRunningTasks(JobInProgress job) {
       return job.runningReduces();
     }
+
+    @Override
     int getPendingTasks(JobInProgress job) {
       return job.pendingReduces();
     }
 
+    @Override
+    int getSlotsPerTask(JobInProgress job) {
+      long myVmem = job.getJobConf().getMemoryForReduceTask();
+      return (int) (Math.ceil((float) myVmem
+          / (float) scheduler.getMemSizeForReduceSlot()));
+    }
+
+    @Override
     TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
       return qsi.reduceTSI;
     }
+
+    @Override
+    boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
+      //check if the job supports reduce speculative execution first then
+      //check if the job has speculative tasks.
+      return (job.getJobConf().getReduceSpeculativeExecution()) && (
+          hasSpeculativeTask(job.getReduceTasks(), 
+              job.getStatus().reduceProgress(), tts));
+    }
+
   }
   
   /** the scheduling mgrs for Map and Reduce tasks */ 
@@ -620,7 +728,10 @@
   protected CapacitySchedulerConf schedConf;
   /** whether scheduler has started or not */
   private boolean started = false;
-  
+
+  static String JOB_SCHEDULING_INFO_FORMAT_STRING =
+      "%s running map tasks using %d map slots,"
+          + " %s running reduce tasks using %d reduce slots.";
   /**
    * A clock class - can be mocked out for testing.
    */
@@ -629,13 +740,19 @@
       return System.currentTimeMillis();
     }
   }
+
+  // can be replaced with a global type, if we have one
+  protected static enum TYPE {
+    MAP, REDUCE
+  }
+
   private Clock clock;
   private JobInitializationPoller initializationPoller;
 
-  long limitMaxVmemForTasks;
-  long limitMaxPmemForTasks;
-  long defaultMaxVmPerTask;
-  float defaultPercentOfPmemInVmem;
+  private long memSizeForMapSlotOnJT;
+  private long memSizeForReduceSlotOnJT;
+  private long limitMaxMemForMapTasks;
+  private long limitMaxMemForReduceTasks;
 
   public CapacityTaskScheduler() {
     this(new Clock());
@@ -652,37 +769,55 @@
     this.schedConf = conf;
   }
 
-  /**
-   * Normalize the negative values in configuration
-   * 
-   * @param val
-   * @return normalized value
-   */
-  private long normalizeMemoryConfigValue(long val) {
-    if (val < 0) {
-      val = JobConf.DISABLED_MEMORY_LIMIT;
-    }
-    return val;
-  }
-
   private void initializeMemoryRelatedConf() {
-    limitMaxVmemForTasks =
-        normalizeMemoryConfigValue(conf.getLong(
-            JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+    memSizeForMapSlotOnJT =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    memSizeForReduceSlotOnJT =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
             JobConf.DISABLED_MEMORY_LIMIT));
+    limitMaxMemForMapTasks =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    limitMaxMemForReduceTasks =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    LOG.info(String.format("Scheduler configured with "
+        + "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT, "
+        + "limitMaxMemForMapTasks, limitMaxMemForReduceTasks)"
+        + " (%d,%d,%d,%d)", Long.valueOf(memSizeForMapSlotOnJT), Long
+        .valueOf(memSizeForReduceSlotOnJT), Long
+        .valueOf(limitMaxMemForMapTasks), Long
+        .valueOf(limitMaxMemForReduceTasks)));
+  }
 
-    limitMaxPmemForTasks =
-        normalizeMemoryConfigValue(schedConf.getLimitMaxPmemForTasks());
+  long getMemSizeForMapSlot() {
+    return memSizeForMapSlotOnJT;
+  }
 
-    defaultMaxVmPerTask =
-        normalizeMemoryConfigValue(conf.getLong(
-            JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
+  long getMemSizeForReduceSlot() {
+    return memSizeForReduceSlotOnJT;
+  }
+
+  long getLimitMaxMemForMapSlot() {
+    return limitMaxMemForMapTasks;
+  }
+
+  long getLimitMaxMemForReduceSlot() {
+    return limitMaxMemForReduceTasks;
+  }
 
-    defaultPercentOfPmemInVmem = schedConf.getDefaultPercentOfPmemInVmem();
-    if (defaultPercentOfPmemInVmem < 0) {
-      defaultPercentOfPmemInVmem = JobConf.DISABLED_MEMORY_LIMIT;
+  String[] getOrderedQueues(CapacityTaskScheduler.TYPE type) {
+    if (type.equals(CapacityTaskScheduler.TYPE.MAP)) {
+      return mapScheduler.getOrderedQueues();
+    } else if (type.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+      return reduceScheduler.getOrderedQueues();
     }
+    return null;
   }
 
   @Override
@@ -707,15 +842,15 @@
     Set<String> queuesWithoutConfiguredCapacity = new HashSet<String>();
     float totalCapacity = 0.0f;
     for (String queueName: queues) {
-      float gc = schedConf.getCapacity(queueName);
-      if(gc == -1.0) {
+      float capacity = schedConf.getCapacity(queueName);
+      if(capacity == -1.0) {
         queuesWithoutConfiguredCapacity.add(queueName);
       }else {
-        totalCapacity += gc;
+        totalCapacity += capacity;
       }
       int ulMin = schedConf.getMinimumUserLimitPercent(queueName); 
       // create our QSI and add to our hashmap
-      QueueSchedulingInfo qsi = new QueueSchedulingInfo(queueName, gc, 
+      QueueSchedulingInfo qsi = new QueueSchedulingInfo(queueName, capacity, 
                                                     ulMin, jobQueuesManager);
       queueInfoMap.put(queueName, qsi);
 
@@ -829,29 +964,49 @@
         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
           continue;
         }
-        int runningMaps = j.runningMaps();
-        int runningReduces = j.runningReduces();
-        qsi.mapTSI.numRunningTasks += runningMaps;
-        qsi.reduceTSI.numRunningTasks += runningReduces;
+
+        int numMapsRunningForThisJob = mapScheduler.getRunningTasks(j);
+        int numReducesRunningForThisJob = reduceScheduler.getRunningTasks(j);
+        int numMapSlotsForThisJob = mapScheduler.getSlotsOccupied(j);
+        int numReduceSlotsForThisJob = reduceScheduler.getSlotsOccupied(j);
+        j.setSchedulingInfo(String.format(JOB_SCHEDULING_INFO_FORMAT_STRING,
+            Integer.valueOf(numMapsRunningForThisJob), Integer
+                .valueOf(numMapSlotsForThisJob), Integer
+                .valueOf(numReducesRunningForThisJob), Integer
+                .valueOf(numReduceSlotsForThisJob)));
+        qsi.mapTSI.numRunningTasks += numMapsRunningForThisJob;
+        qsi.reduceTSI.numRunningTasks += numReducesRunningForThisJob;
+        qsi.mapTSI.numSlotsOccupied += numMapSlotsForThisJob;
+        qsi.reduceTSI.numSlotsOccupied += numReduceSlotsForThisJob;
         Integer i =
-          qsi.mapTSI.numRunningTasksByUser.get(j.getProfile().getUser());
-        qsi.mapTSI.numRunningTasksByUser.put(j.getProfile().getUser(),
-            i+runningMaps);
-        i = qsi.reduceTSI.numRunningTasksByUser.get(j.getProfile().getUser());
-        qsi.reduceTSI.numRunningTasksByUser.put(j.getProfile().getUser(),
-            i+runningReduces);
-        LOG.debug("updateQSI: job " + j.getJobID().toString() + ": run(m) = " +
-            j.runningMaps() + ", run(r) = " + j.runningReduces() +
-            ", finished(m) = " + j.finishedMaps() + ", finished(r)= " +
-            j.finishedReduces() + ", failed(m) = " + j.failedMapTasks +
-            ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " +
-            j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks
-            + ", total(m) = " + j.numMapTasks + ", total(r) = " +
-            j.numReduceTasks);
+            qsi.mapTSI.numSlotsOccupiedByUser.get(j.getProfile().getUser());
+        qsi.mapTSI.numSlotsOccupiedByUser.put(j.getProfile().getUser(),
+            Integer.valueOf(i.intValue() + numMapSlotsForThisJob));
+        i = qsi.reduceTSI.numSlotsOccupiedByUser.get(j.getProfile().getUser());
+        qsi.reduceTSI.numSlotsOccupiedByUser.put(j.getProfile().getUser(),
+            Integer.valueOf(i.intValue() + numReduceSlotsForThisJob));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("updateQSI: job %s: run(m)=%d, "
+              + "occupied(m)=%d, run(r)=%d, occupied(r)=%d, finished(m)=%d,"
+              + " finished(r)=%d, failed(m)=%d, failed(r)=%d, "
+              + "spec(m)=%d, spec(r)=%d, total(m)=%d, total(r)=%d", j
+              .getJobID().toString(), Integer
+              .valueOf(numMapsRunningForThisJob), Integer
+              .valueOf(numMapSlotsForThisJob), Integer
+              .valueOf(numReducesRunningForThisJob), Integer
+              .valueOf(numReduceSlotsForThisJob), Integer.valueOf(j
+              .finishedMaps()), Integer.valueOf(j.finishedReduces()), Integer
+              .valueOf(j.failedMapTasks),
+              Integer.valueOf(j.failedReduceTasks), Integer
+                  .valueOf(j.speculativeMapTasks), Integer
+                  .valueOf(j.speculativeReduceTasks), Integer
+                  .valueOf(j.numMapTasks), Integer.valueOf(j.numReduceTasks)));
+        }
+
         /*
          * it's fine walking down the entire list of running jobs - there
          * probably will not be many, plus, we may need to go through the
-         * list to compute numRunningTasksByUser. If this is expensive, we
+         * list to compute numSlotsOccupiedByUser. If this is expensive, we
          * can keep a list of running jobs per user. Then we only need to
          * consider the first few jobs per user.
          */
@@ -866,10 +1021,8 @@
    * The grand plan for assigning a task. 
    * First, decide whether a Map or Reduce task should be given to a TT 
    * (if the TT can accept either). 
-   * Next, pick a queue. We only look at queues that need a slot. Among
-   * these, we first look at queues whose ac is less than gc (queues that 
-   * gave up capacity in the past). Next, we look at any other queue that
-   * needs a slot. 
+   * Next, pick a queue. We only look at queues that need a slot. Among these,
+   * we first look at queues whose (# of running tasks)/capacity is the least.
    * Next, pick a job in a queue. we pick the job at the front of the queue
    * unless its user is over the user limit. 
    * Finally, given a job, pick a task from the job. 
@@ -920,14 +1073,12 @@
         // found a task; return
         return Collections.singletonList(tlr.getTask());
       }
-      else if (TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT == 
-        tlr.getLookUpStatus()) {
-        // return no task
-        return null;
-      }
       // if we didn't get any, look at map tasks, if TT has space
-      else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND == 
-        tlr.getLookUpStatus()) && (maxMapTasks > currentMapTasks)) {
+      else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
+                                  == tlr.getLookUpStatus() ||
+                TaskLookupResult.LookUpStatus.NO_TASK_FOUND
+                                  == tlr.getLookUpStatus())
+          && (maxMapTasks > currentMapTasks)) {
         mapScheduler.updateCollectionOfQSIs();
         tlr = mapScheduler.assignTasks(taskTracker);
         if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
@@ -945,13 +1096,12 @@
         // found a task; return
         return Collections.singletonList(tlr.getTask());
       }
-      else if (TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT == 
-        tlr.getLookUpStatus()) {
-        return null;
-      }
       // if we didn't get any, look at reduce tasks, if TT has space
-      else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND == 
-        tlr.getLookUpStatus()) && (maxReduceTasks > currentReduceTasks)) {
+      else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
+                                    == tlr.getLookUpStatus()
+                || TaskLookupResult.LookUpStatus.NO_TASK_FOUND
+                                    == tlr.getLookUpStatus())
+          && (maxReduceTasks > currentReduceTasks)) {
         reduceScheduler.updateCollectionOfQSIs();
         tlr = reduceScheduler.assignTasks(taskTracker);
         if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
@@ -964,38 +1114,6 @@
     return null;
   }
 
-  /**
-   * Kill the job if it has invalid requirements and return why it is killed
-   * 
-   * @param job
-   * @return string mentioning why the job is killed. Null if the job has valid
-   *         requirements.
-   */
-  private String killJobIfInvalidRequirements(JobInProgress job) {
-    if (!memoryMatcher.isSchedulingBasedOnVmemEnabled()) {
-      return null;
-    }
-    if ((job.getMaxVirtualMemoryForTask() > limitMaxVmemForTasks)
-        || (memoryMatcher.isSchedulingBasedOnPmemEnabled() && (job
-            .getMaxPhysicalMemoryForTask() > limitMaxPmemForTasks))) {
-      String msg =
-          job.getJobID() + " (" + job.getMaxVirtualMemoryForTask() + "vmem, "
-              + job.getMaxPhysicalMemoryForTask()
-              + "pmem) exceeds the cluster's max-memory-limits ("
-              + limitMaxVmemForTasks + "vmem, " + limitMaxPmemForTasks
-              + "pmem). Cannot run in this cluster, so killing it.";
-      LOG.warn(msg);
-      try {
-        taskTrackerManager.killJob(job.getJobID());
-        return msg;
-      } catch (IOException ioe) {
-        LOG.warn("Failed to kill the job " + job.getJobID() + ". Reason : "
-            + StringUtils.stringifyException(ioe));
-      }
-    }
-    return null;
-  }
-
   // called when a job is added
   synchronized void jobAdded(JobInProgress job) throws IOException {
     QueueSchedulingInfo qsi = 
@@ -1006,8 +1124,10 @@
     if (null == i) {
       i = 1;
       // set the count for running tasks to 0
-      qsi.mapTSI.numRunningTasksByUser.put(job.getProfile().getUser(), 0);
-      qsi.reduceTSI.numRunningTasksByUser.put(job.getProfile().getUser(), 0);
+      qsi.mapTSI.numSlotsOccupiedByUser.put(job.getProfile().getUser(),
+          Integer.valueOf(0));
+      qsi.reduceTSI.numSlotsOccupiedByUser.put(job.getProfile().getUser(),
+          Integer.valueOf(0));
     }
     else {
       i++;
@@ -1015,13 +1135,6 @@
     qsi.numJobsByUser.put(job.getProfile().getUser(), i);
     LOG.debug("Job " + job.getJobID().toString() + " is added under user " 
               + job.getProfile().getUser() + ", user now has " + i + " jobs");
-
-    // Kill the job if it cannot run in the cluster because of invalid
-    // resource requirements.
-    String statusMsg = killJobIfInvalidRequirements(job);
-    if (statusMsg != null) {
-      throw new IOException(statusMsg);
-    }
   }
 
   // called when a job completes
@@ -1036,8 +1149,8 @@
     if (0 == i.intValue()) {
       qsi.numJobsByUser.remove(job.getProfile().getUser());
       // remove job footprint from our TSIs
-      qsi.mapTSI.numRunningTasksByUser.remove(job.getProfile().getUser());
-      qsi.reduceTSI.numRunningTasksByUser.remove(job.getProfile().getUser());
+      qsi.mapTSI.numSlotsOccupiedByUser.remove(job.getProfile().getUser());
+      qsi.reduceTSI.numSlotsOccupiedByUser.remove(job.getProfile().getUser());
       LOG.debug("No more jobs for user, number of users = " + qsi.numJobsByUser.size());
     }
     else {

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=786377&r1=786376&r2=786377&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java Fri Jun 19 05:42:53 2009
@@ -30,111 +30,32 @@
     this.scheduler = capacityTaskScheduler;
   }
 
-  boolean isSchedulingBasedOnVmemEnabled() {
-    LOG.debug("defaultMaxVmPerTask : " + scheduler.defaultMaxVmPerTask
-        + " limitMaxVmemForTasks : " + scheduler.limitMaxVmemForTasks);
-    if (scheduler.defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT
-        || scheduler.limitMaxVmemForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
+  boolean isSchedulingBasedOnMemEnabled() {
+    if (scheduler.getLimitMaxMemForMapSlot()
+                                  == JobConf.DISABLED_MEMORY_LIMIT
+        || scheduler.getLimitMaxMemForReduceSlot()
+                                  == JobConf.DISABLED_MEMORY_LIMIT
+        || scheduler.getMemSizeForMapSlot()
+                                  == JobConf.DISABLED_MEMORY_LIMIT
+        || scheduler.getMemSizeForReduceSlot()
+                                  == JobConf.DISABLED_MEMORY_LIMIT) {
       return false;
     }
     return true;
   }
 
-  boolean isSchedulingBasedOnPmemEnabled() {
-    LOG.debug("defaultPercentOfPmemInVmem : "
-        + scheduler.defaultPercentOfPmemInVmem + " limitMaxPmemForTasks : "
-        + scheduler.limitMaxPmemForTasks);
-    if (scheduler.defaultPercentOfPmemInVmem == JobConf.DISABLED_MEMORY_LIMIT
-        || scheduler.limitMaxPmemForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * Obtain the virtual memory allocated for a job's tasks.
-   * 
-   * If the job has a configured value for the max-virtual memory, that will be
-   * returned. Else, the cluster-wide default max-virtual memory for tasks is
-   * returned.
-   * 
-   * This method can only be called after
-   * {@link CapacityTaskScheduler#initializeMemoryRelatedConf()} is invoked.
-   * 
-   * @param jConf JobConf of the job
-   * @return the virtual memory allocated for the job's tasks.
-   */
-  private long getVirtualMemoryForTask(JobConf jConf) {
-    long vMemForTask = jConf.getMaxVirtualMemoryForTask();
-    if (vMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      vMemForTask =
-          new JobConf().getLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-              scheduler.defaultMaxVmPerTask);
-    }
-    return vMemForTask;
-  }
-
-  /**
-   * Obtain the physical memory allocated for a job's tasks.
-   * 
-   * If the job has a configured value for the max physical memory, that
-   * will be returned. Else, the cluster-wide default physical memory for
-   * tasks is returned.
-   * 
-   * This method can only be called after
-   * {@link CapacityTaskScheduler#initializeMemoryRelatedConf()} is invoked.
-   * 
-   * @param jConf JobConf of the job
-   * @return the physical memory allocated for the job's tasks
-   */
-  private long getPhysicalMemoryForTask(JobConf jConf) {
-    long pMemForTask = jConf.getMaxPhysicalMemoryForTask();
-    if (pMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      pMemForTask =
-          Math.round(getVirtualMemoryForTask(jConf)
-              * scheduler.defaultPercentOfPmemInVmem);
-    }
-    return pMemForTask;
-  }
-
-  static class Memory {
-    long vmem;
-    long pmem;
-
-    Memory(long vm, long pm) {
-      this.vmem = vm;
-      this.pmem = pm;
-    }
-  }
-
   /**
    * Find the memory that is already used by all the running tasks
    * residing on the given TaskTracker.
    * 
    * @param taskTracker
+   * @param taskType 
    * @return amount of memory that is used by the residing tasks,
    *          null if memory cannot be computed for some reason.
    */
-  private synchronized Memory getMemReservedForTasks(
-      TaskTrackerStatus taskTracker) {
-    boolean disabledVmem = false;
-    boolean disabledPmem = false;
-
-    if (scheduler.defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      disabledVmem = true;
-    }
-
-    if (scheduler.defaultPercentOfPmemInVmem == JobConf.DISABLED_MEMORY_LIMIT) {
-      disabledPmem = true;
-    }
-
-    if (disabledVmem && disabledPmem) {
-      return new Memory(JobConf.DISABLED_MEMORY_LIMIT,
-          JobConf.DISABLED_MEMORY_LIMIT);
-    }
-
+  synchronized Long getMemReservedForTasks(
+      TaskTrackerStatus taskTracker, CapacityTaskScheduler.TYPE taskType) {
     long vmem = 0;
-    long pmem = 0;
 
     for (TaskStatus task : taskTracker.getTaskReports()) {
       // the following task states are one in which the slot is
@@ -142,12 +63,12 @@
       // accounted in used memory.
       if ((task.getRunState() == TaskStatus.State.RUNNING)
           || (task.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
-        JobInProgress job = scheduler.taskTrackerManager.getJob(
-                                              task.getTaskID().getJobID());
+        JobInProgress job =
+            scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID());
         if (job == null) {
           // This scenario can happen if a job was completed/killed
-          // and retired from JT's memory. In this state, we can ignore 
-          // the running task status and compute memory for the rest of 
+          // and retired from JT's memory. In this state, we can ignore
+          // the running task status and compute memory for the rest of
           // the tasks. However, any scheduling done with this computation
           // could result in over-subscribing of memory for tasks on this
           // TT (as the unaccounted for task is still running).
@@ -155,123 +76,99 @@
           // One of the ways of doing that is to return null from here
           // and check for null in the calling method.
           LOG.info("Task tracker: " + taskTracker.getHost() + " is reporting "
-                    + "a running / commit pending task: " + task.getTaskID()
-                    + " but no corresponding job was found. "
-                    + "Maybe job was retired. Not computing "
-                    + "memory values for this TT.");
+              + "a running / commit pending task: " + task.getTaskID()
+              + " but no corresponding job was found. "
+              + "Maybe job was retired. Not computing "
+              + "memory values for this TT.");
           return null;
         }
-        
-        JobConf jConf =
-            scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID())
-                .getJobConf();
-        if (!disabledVmem) {
-          vmem += getVirtualMemoryForTask(jConf);
-        }
-        if (!disabledPmem) {
-          pmem += getPhysicalMemoryForTask(jConf);
+
+        JobConf jConf = job.getJobConf();
+
+        // Get the memory "allotted" for this task by rounding off the job's
+        // tasks' memory limits to the nearest multiple of the slot-memory-size
+        // set on JT. This essentially translates to tasks of a high memory job
+        // using multiple slots.
+        long myVmem = 0;
+        if (task.getIsMap() && taskType.equals(CapacityTaskScheduler.TYPE.MAP)) {
+          myVmem = jConf.getMemoryForMapTask();
+          myVmem =
+              (long) (scheduler.getMemSizeForMapSlot() * Math
+                  .ceil((float) myVmem
+                      / (float) scheduler.getMemSizeForMapSlot()));
+        } else if (!task.getIsMap()
+            && taskType.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+          myVmem = jConf.getMemoryForReduceTask();
+          myVmem =
+              (long) (scheduler.getMemSizeForReduceSlot() * Math
+                  .ceil((float) myVmem
+                      / (float) scheduler.getMemSizeForReduceSlot()));
         }
+        vmem += myVmem;
       }
     }
 
-    return new Memory(vmem, pmem);
+    return Long.valueOf(vmem);
   }
 
   /**
-   * Check if a TT has enough pmem and vmem to run this job.
+   * Check if a TT has enough memory to run of task specified from this job.
    * @param job
+   * @param taskType 
    * @param taskTracker
    * @return true if this TT has enough memory for this job. False otherwise.
    */
   boolean matchesMemoryRequirements(JobInProgress job,
-      TaskTrackerStatus taskTracker) {
+      CapacityTaskScheduler.TYPE taskType, TaskTrackerStatus taskTracker) {
 
-    // ////////////// vmem based scheduling
-    if (!isSchedulingBasedOnVmemEnabled()) {
-      LOG.debug("One of the configuration parameters defaultMaxVmPerTask "
-          + "and limitMaxVmemPerTasks is not configured. Scheduling based "
-          + "on job's memory requirements is disabled, ignoring any value "
-          + "set by job.");
-      return true;
-    }
-
-    TaskTrackerStatus.ResourceStatus resourceStatus =
-        taskTracker.getResourceStatus();
-    long totalVMemOnTT = resourceStatus.getTotalVirtualMemory();
-    long reservedVMemOnTT = resourceStatus.getReservedTotalMemory();
-
-    if (totalVMemOnTT == JobConf.DISABLED_MEMORY_LIMIT
-        || reservedVMemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
-      return true;
-    }
+    LOG.debug("Matching memory requirements of " + job.getJobID().toString()
+        + " for scheduling on " + taskTracker.trackerName);
 
-    if (reservedVMemOnTT > totalVMemOnTT) {
+    if (!isSchedulingBasedOnMemEnabled()) {
+      LOG.debug("Scheduling based on job's memory requirements is disabled."
+          + " Ignoring any value set by job.");
       return true;
     }
 
-    long jobVMemForTask = job.getMaxVirtualMemoryForTask();
-    if (jobVMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      jobVMemForTask = scheduler.defaultMaxVmPerTask;
-    }
-
-    Memory memReservedForTasks = getMemReservedForTasks(taskTracker);
-    if (memReservedForTasks == null) {
+    Long memUsedOnTT = getMemReservedForTasks(taskTracker, taskType);
+    if (memUsedOnTT == null) {
       // For some reason, maybe because we could not find the job
       // corresponding to a running task (as can happen if the job
       // is retired in between), we could not compute the memory state
       // on this TT. Treat this as an error, and fail memory
       // requirements.
-      LOG.info("Could not compute memory for taskTracker: " 
-                + taskTracker.getHost() + ". Failing memory requirements.");
+      LOG.info("Could not compute memory for taskTracker: "
+          + taskTracker.getHost() + ". Failing memory requirements.");
       return false;
     }
-    long vmemUsedOnTT = memReservedForTasks.vmem;
-    long pmemUsedOnTT = memReservedForTasks.pmem;
 
-    long freeVmemUsedOnTT = totalVMemOnTT - vmemUsedOnTT - reservedVMemOnTT;
+    long totalMemUsableOnTT = 0;
 
-    if (jobVMemForTask > freeVmemUsedOnTT) {
+    long memForThisTask = 0;
+    if (taskType.equals(CapacityTaskScheduler.TYPE.MAP)) {
+      memForThisTask = job.getJobConf().getMemoryForMapTask();
+      totalMemUsableOnTT =
+          scheduler.getMemSizeForMapSlot() * taskTracker.getMaxMapTasks();
+    } else if (taskType.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+      memForThisTask = job.getJobConf().getMemoryForReduceTask();
+      totalMemUsableOnTT =
+          scheduler.getMemSizeForReduceSlot()
+              * taskTracker.getMaxReduceTasks();
+    }
+
+    long freeMemOnTT = totalMemUsableOnTT - memUsedOnTT.longValue();
+    if (memForThisTask > freeMemOnTT) {
+      LOG.debug("memForThisTask (" + memForThisTask + ") > freeMemOnTT ("
+          + freeMemOnTT + "). A " + taskType + " task from "
+          + job.getJobID().toString() + " cannot be scheduled on TT "
+          + taskTracker.trackerName);
       return false;
     }
 
-    // ////////////// pmem based scheduling
-
-    long totalPmemOnTT = resourceStatus.getTotalPhysicalMemory();
-    long reservedPmemOnTT = resourceStatus.getReservedPhysicalMemory();
-    long jobPMemForTask = job.getMaxPhysicalMemoryForTask();
-    long freePmemUsedOnTT = 0;
-
-    if (isSchedulingBasedOnPmemEnabled()) {
-      if (totalPmemOnTT == JobConf.DISABLED_MEMORY_LIMIT
-          || reservedPmemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
-        return true;
-      }
-
-      if (reservedPmemOnTT > totalPmemOnTT) {
-        return true;
-      }
-
-      if (jobPMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
-        jobPMemForTask =
-            Math.round(jobVMemForTask * scheduler.defaultPercentOfPmemInVmem);
-      }
-
-      freePmemUsedOnTT = totalPmemOnTT - pmemUsedOnTT - reservedPmemOnTT;
-
-      if (jobPMemForTask > freePmemUsedOnTT) {
-        return false;
-      }
-    } else {
-      LOG.debug("One of the configuration parameters "
-          + "defaultPercentOfPmemInVmem and limitMaxPmemPerTasks is not "
-          + "configured. Scheduling based on job's physical memory "
-          + "requirements is disabled, ignoring any value set by job.");
-    }
-
-    LOG.debug("freeVMemOnTT = " + freeVmemUsedOnTT + " totalVMemOnTT = "
-        + totalVMemOnTT + " freePMemOnTT = " + freePmemUsedOnTT
-        + " totalPMemOnTT = " + totalPmemOnTT + " jobVMemForTask = "
-        + jobVMemForTask + " jobPMemForTask = " + jobPMemForTask);
+    LOG.debug("memForThisTask = " + memForThisTask + ". freeMemOnTT = "
+        + freeMemOnTT + ". A " + taskType.toString() + " task from "
+        + job.getJobID().toString() + " matches memory requirements on TT "
+        + taskTracker.trackerName);
     return true;
   }
 }



Mime
View raw message