hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r788036 [1/3] - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapr...
Date Wed, 24 Jun 2009 14:22:15 GMT
Author: yhemanth
Date: Wed Jun 24 14:22:13 2009
New Revision: 788036

URL: http://svn.apache.org/viewvc?rev=788036&view=rev
Log:
MAPREDUCE-516. Fix the starvation problem in the Capacity Scheduler when running High RAM Jobs. Contributed by Arun Murthy.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java.orig
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityScheduler.java
    hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityScheduler.java
    hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/FakeDynamicScheduler.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskScheduler.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.properties
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java
    hadoop/mapreduce/trunk/src/webapps/job/jobfailures.jsp
    hadoop/mapreduce/trunk/src/webapps/job/machines.jsp
    hadoop/mapreduce/trunk/src/webapps/job/taskdetails.jsp

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Jun 24 14:22:13 2009
@@ -2,6 +2,11 @@
 
 Trunk (unreleased changes)
 
+  INCOMPATIBLE CHANGES
+
+    MAPREDUCE-516. Fix the starvation problem in the Capacity Scheduler 
+    when running High RAM Jobs. (Arun Murthy via yhemanth)
+
   NEW FEATURES
 
     HADOOP-5887. Sqoop should create tables in Hive metastore after importing

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Wed Jun 24 14:22:13 2009
@@ -32,6 +32,9 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 /**
  * A {@link TaskScheduler} that implements the requirements in HADOOP-3421
@@ -290,13 +293,14 @@
 
     /** our TaskScheduler object */
     protected CapacityTaskScheduler scheduler;
-    protected CapacityTaskScheduler.TYPE type = null;
+    protected TaskType type = null;
 
     abstract Task obtainNewTask(TaskTrackerStatus taskTracker, 
         JobInProgress job) throws IOException;
 
     int getSlotsOccupied(JobInProgress job) {
-      return getRunningTasks(job) * getSlotsPerTask(job);
+      return (getNumReservedTaskTrackers(job) + getRunningTasks(job)) * 
+             getSlotsPerTask(job);
     }
 
     abstract int getClusterCapacity();
@@ -304,6 +308,8 @@
     abstract int getRunningTasks(JobInProgress job);
     abstract int getPendingTasks(JobInProgress job);
     abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
+    abstract int getNumReservedTaskTrackers(JobInProgress job);
+    
     /**
      * To check if job has a speculative task on the particular tracker.
      * 
@@ -315,6 +321,18 @@
         TaskTrackerStatus tts);
 
     /**
+     * Check if the given job has sufficient reserved tasktrackers for all its
+     * pending tasks.
+     * 
+     * @param job job to check for sufficient reserved tasktrackers 
+     * @return <code>true</code> if the job has reserved tasktrackers,
+     *         else <code>false</code>
+     */
+    boolean hasSufficientReservedTaskTrackers(JobInProgress job) {
+      return getNumReservedTaskTrackers(job) >= getPendingTasks(job);
+    }
+    
+    /**
      * List of QSIs for assigning tasks.
      * Queues are ordered by a ratio of (# of running tasks)/capacity, which
      * indicates how much 'free space' the queue has, or how much it is over
@@ -419,10 +437,10 @@
      * It tries to get a task from jobs in a single queue. 
      * Always return a TaskLookupResult object. Don't return null. 
      */
-    private TaskLookupResult getTaskFromQueue(TaskTrackerStatus taskTracker,
-        QueueSchedulingInfo qsi)
-        throws IOException {
-
+    private TaskLookupResult getTaskFromQueue(TaskTracker taskTracker,
+                                              QueueSchedulingInfo qsi)
+    throws IOException {
+      TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
       // we only look at jobs in the running queues, as these are the ones
       // who have been potentially initialized
 
@@ -442,9 +460,9 @@
         //a task to be scheduled on the task tracker.
         //if we find a job then we pass it on.
         if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
-            taskTracker)) {
+                                                              taskTrackerStatus)) {
           // We found a suitable job. Get task from it.
-          Task t = obtainNewTask(taskTracker, j);
+          Task t = obtainNewTask(taskTrackerStatus, j);
           //if there is a task return it immediately.
           if (t != null) {
             // we're successful in getting a task
@@ -457,10 +475,18 @@
           }
         } else {
           //if memory requirements don't match then we check if the 
-          //job has either pending or speculative task. If the job
-          //has pending or speculative task we block till this job
-          //tasks get scheduled. So that high memory jobs are not starved
-          if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTracker)) {
+          //job has either pending or speculative task or has insufficient number
+          //of 'reserved' tasktrackers to cover all pending tasks. If so
+          //we reserve the current tasktracker for this job so that 
+          //high memory jobs are not starved
+          if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTrackerStatus) || 
+              !hasSufficientReservedTaskTrackers(j)) {
+            // Reserve all available slots on this tasktracker
+            LOG.info(j.getJobID() + ": Reserving " + taskTracker.getTrackerName() + 
+                     " since memory-requirements don't match");
+            taskTracker.reserveSlots(type, j, taskTracker.getAvailableSlots(type));
+            
+            // Block
             return TaskLookupResult.getMemFailedResult();
           } 
         }//end of memory check block
@@ -472,7 +498,9 @@
       // the user limit for some user is too strict, i.e., there's at least 
       // one user who doesn't have enough tasks to satisfy his limit. If 
       // it's the latter case, re-look at jobs without considering user 
-      // limits, and get a task from the first eligible job
+      // limits, and get a task from the first eligible job; however
+      // we do not 'reserve' slots on tasktrackers anymore since the user is 
+      // already over the limit
       // Note: some of the code from above is repeated here. This is on 
       // purpose as it improves overall readability.  
       // Note: we walk through jobs again. Some of these jobs, which weren't
@@ -488,9 +516,9 @@
           continue;
         }
         if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
-            taskTracker)) {
+            taskTrackerStatus)) {
           // We found a suitable job. Get task from it.
-          Task t = obtainNewTask(taskTracker, j);
+          Task t = obtainNewTask(taskTrackerStatus, j);
           //if there is a task return it immediately.
           if (t != null) {
             // we're successful in getting a task
@@ -505,7 +533,7 @@
           //has pending or speculative task we block till this job
           //tasks get scheduled, so that high memory jobs are not 
           //starved
-          if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTracker)) {
+          if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTrackerStatus)) {
             return TaskLookupResult.getMemFailedResult();
           } 
         }//end of memory check block
@@ -520,10 +548,52 @@
     // Always return a TaskLookupResult object. Don't return null. 
     // The caller is responsible for ensuring that the QSI objects and the 
     // collections are up-to-date.
-    private TaskLookupResult assignTasks(TaskTrackerStatus taskTracker) throws IOException {
+    private TaskLookupResult assignTasks(TaskTracker taskTracker) 
+    throws IOException {
+      TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
 
       printQSIs();
 
+      // Check if this tasktracker has been reserved for a job...
+      JobInProgress job = taskTracker.getJobForFallowSlot(type);
+      if (job != null) {
+        int availableSlots = taskTracker.getAvailableSlots(type);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(job.getJobID() + ": Checking 'reserved' tasktracker " + 
+                    taskTracker.getTrackerName() + " with " + availableSlots + 
+                    " '" + type + "' slots");
+        }
+
+        if (availableSlots >= job.getNumSlotsPerTask(type)) {
+          // Unreserve 
+          taskTracker.unreserveSlots(type, job);
+          
+          // We found a suitable job. Get task from it.
+          Task t = obtainNewTask(taskTrackerStatus, job);
+          //if there is a task return it immediately.
+          if (t != null) {
+            if (LOG.isDebugEnabled()) {
+              LOG.info(job.getJobID() + ": Got " + t.getTaskID() + 
+                       " for reserved tasktracker " + 
+                       taskTracker.getTrackerName());
+            }
+            // we're successful in getting a task
+            return TaskLookupResult.getTaskFoundResult(t);
+          } 
+        } else {
+          // Re-reserve the current tasktracker
+          taskTracker.reserveSlots(type, job, availableSlots);
+          
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(job.getJobID() + ": Re-reserving " + 
+                      taskTracker.getTrackerName());
+          }
+
+          return TaskLookupResult.getMemFailedResult(); 
+        }
+      }
+      
+      
       for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
         // we may have queues with capacity=0. We shouldn't look at jobs from 
         // these queues
@@ -600,7 +670,7 @@
 
     MapSchedulingMgr(CapacityTaskScheduler schedulr) {
       super(schedulr);
-      type = CapacityTaskScheduler.TYPE.MAP;
+      type = TaskType.MAP;
       queueComparator = mapComparator;
     }
 
@@ -631,9 +701,8 @@
 
     @Override
     int getSlotsPerTask(JobInProgress job) {
-      long myVmem = job.getJobConf().getMemoryForMapTask();
-      return (int) (Math.ceil((float) myVmem
-          / (float) scheduler.getMemSizeForMapSlot()));
+      return 
+        job.getJobConf().computeNumSlotsPerMap(scheduler.getMemSizeForMapSlot());    
     }
 
     @Override
@@ -641,6 +710,10 @@
       return qsi.mapTSI;
     }
 
+    int getNumReservedTaskTrackers(JobInProgress job) {
+      return job.getNumReservedTaskTrackersForMaps();
+    }
+
     @Override
     boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
       //Check if job supports speculative map execution first then 
@@ -659,7 +732,7 @@
 
     ReduceSchedulingMgr(CapacityTaskScheduler schedulr) {
       super(schedulr);
-      type = CapacityTaskScheduler.TYPE.REDUCE;
+      type = TaskType.REDUCE;
       queueComparator = reduceComparator;
     }
 
@@ -691,9 +764,8 @@
 
     @Override
     int getSlotsPerTask(JobInProgress job) {
-      long myVmem = job.getJobConf().getMemoryForReduceTask();
-      return (int) (Math.ceil((float) myVmem
-          / (float) scheduler.getMemSizeForReduceSlot()));
+      return
+        job.getJobConf().computeNumSlotsPerReduce(scheduler.getMemSizeForReduceSlot());    
     }
 
     @Override
@@ -701,6 +773,10 @@
       return qsi.reduceTSI;
     }
 
+    int getNumReservedTaskTrackers(JobInProgress job) {
+      return job.getNumReservedTaskTrackersForReduces();
+    }
+
     @Override
     boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
       //check if the job supports reduce speculative execution first then
@@ -729,9 +805,10 @@
   /** whether scheduler has started or not */
   private boolean started = false;
 
-  static String JOB_SCHEDULING_INFO_FORMAT_STRING =
-      "%s running map tasks using %d map slots,"
-          + " %s running reduce tasks using %d reduce slots.";
+  final static String JOB_SCHEDULING_INFO_FORMAT_STRING =
+    "%s running map tasks using %d map slots. %d additional slots reserved." +
+    " %s running reduce tasks using %d reduce slots." +
+    " %d additional slots reserved.";
   /**
    * A clock class - can be mocked out for testing.
    */
@@ -741,11 +818,6 @@
     }
   }
 
-  // can be replaced with a global type, if we have one
-  protected static enum TYPE {
-    MAP, REDUCE
-  }
-
   private Clock clock;
   private JobInitializationPoller initializationPoller;
 
@@ -811,10 +883,10 @@
     return limitMaxMemForReduceTasks;
   }
 
-  String[] getOrderedQueues(CapacityTaskScheduler.TYPE type) {
-    if (type.equals(CapacityTaskScheduler.TYPE.MAP)) {
+  String[] getOrderedQueues(TaskType type) {
+    if (type == TaskType.MAP) {
       return mapScheduler.getOrderedQueues();
-    } else if (type.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+    } else if (type == TaskType.REDUCE) {
       return reduceScheduler.getOrderedQueues();
     }
     return null;
@@ -967,13 +1039,26 @@
 
         int numMapsRunningForThisJob = mapScheduler.getRunningTasks(j);
         int numReducesRunningForThisJob = reduceScheduler.getRunningTasks(j);
+        int numRunningMapSlots = 
+          numMapsRunningForThisJob * mapScheduler.getSlotsPerTask(j);
+        int numRunningReduceSlots =
+          numReducesRunningForThisJob * reduceScheduler.getSlotsPerTask(j);
         int numMapSlotsForThisJob = mapScheduler.getSlotsOccupied(j);
         int numReduceSlotsForThisJob = reduceScheduler.getSlotsOccupied(j);
-        j.setSchedulingInfo(String.format(JOB_SCHEDULING_INFO_FORMAT_STRING,
-            Integer.valueOf(numMapsRunningForThisJob), Integer
-                .valueOf(numMapSlotsForThisJob), Integer
-                .valueOf(numReducesRunningForThisJob), Integer
-                .valueOf(numReduceSlotsForThisJob)));
+        int numReservedMapSlotsForThisJob = 
+          (mapScheduler.getNumReservedTaskTrackers(j) * 
+           mapScheduler.getSlotsPerTask(j)); 
+        int numReservedReduceSlotsForThisJob = 
+          (reduceScheduler.getNumReservedTaskTrackers(j) * 
+           reduceScheduler.getSlotsPerTask(j)); 
+        j.setSchedulingInfo(
+            String.format(JOB_SCHEDULING_INFO_FORMAT_STRING,
+                          Integer.valueOf(numMapsRunningForThisJob), 
+                          Integer.valueOf(numRunningMapSlots),
+                          Integer.valueOf(numReservedMapSlotsForThisJob),
+                          Integer.valueOf(numReducesRunningForThisJob), 
+                          Integer.valueOf(numRunningReduceSlots),
+                          Integer.valueOf(numReservedReduceSlotsForThisJob)));
         qsi.mapTSI.numRunningTasks += numMapsRunningForThisJob;
         qsi.reduceTSI.numRunningTasks += numReducesRunningForThisJob;
         qsi.mapTSI.numSlotsOccupied += numMapSlotsForThisJob;
@@ -1029,10 +1114,12 @@
    *  
    */
   @Override
-  public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
-      throws IOException {
+  public synchronized List<Task> assignTasks(TaskTracker taskTracker)
+  throws IOException {
     
     TaskLookupResult tlr;
+    TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
+    
     /* 
      * If TT has Map and Reduce slot free, we need to figure out whether to
      * give it a Map or Reduce task.
@@ -1042,14 +1129,14 @@
     ClusterStatus c = taskTrackerManager.getClusterStatus();
     int mapClusterCapacity = c.getMaxMapTasks();
     int reduceClusterCapacity = c.getMaxReduceTasks();
-    int maxMapTasks = taskTracker.getMaxMapTasks();
-    int currentMapTasks = taskTracker.countMapTasks();
-    int maxReduceTasks = taskTracker.getMaxReduceTasks();
-    int currentReduceTasks = taskTracker.countReduceTasks();
-    LOG.debug("TT asking for task, max maps=" + taskTracker.getMaxMapTasks() + 
-        ", run maps=" + taskTracker.countMapTasks() + ", max reds=" + 
-        taskTracker.getMaxReduceTasks() + ", run reds=" + 
-        taskTracker.countReduceTasks() + ", map cap=" + 
+    int maxMapSlots = taskTrackerStatus.getMaxMapSlots();
+    int currentMapSlots = taskTrackerStatus.countOccupiedMapSlots();
+    int maxReduceSlots = taskTrackerStatus.getMaxReduceSlots();
+    int currentReduceSlots = taskTrackerStatus.countOccupiedReduceSlots();
+    LOG.debug("TT asking for task, max maps=" + taskTrackerStatus.getMaxMapSlots() + 
+        ", run maps=" + taskTrackerStatus.countMapTasks() + ", max reds=" + 
+        taskTrackerStatus.getMaxReduceSlots() + ", run reds=" + 
+        taskTrackerStatus.countReduceTasks() + ", map cap=" + 
         mapClusterCapacity + ", red cap = " + 
         reduceClusterCapacity);
 
@@ -1063,8 +1150,8 @@
     // make sure we get our map or reduce scheduling object to update its 
     // collection of QSI objects too. 
 
-    if ((maxReduceTasks - currentReduceTasks) > 
-    (maxMapTasks - currentMapTasks)) {
+    if ((maxReduceSlots - currentReduceSlots) > 
+    (maxMapSlots - currentMapSlots)) {
       // get a reduce task first
       reduceScheduler.updateCollectionOfQSIs();
       tlr = reduceScheduler.assignTasks(taskTracker);
@@ -1078,7 +1165,7 @@
                                   == tlr.getLookUpStatus() ||
                 TaskLookupResult.LookUpStatus.NO_TASK_FOUND
                                   == tlr.getLookUpStatus())
-          && (maxMapTasks > currentMapTasks)) {
+          && (maxMapSlots > currentMapSlots)) {
         mapScheduler.updateCollectionOfQSIs();
         tlr = mapScheduler.assignTasks(taskTracker);
         if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
@@ -1101,7 +1188,7 @@
                                     == tlr.getLookUpStatus()
                 || TaskLookupResult.LookUpStatus.NO_TASK_FOUND
                                     == tlr.getLookUpStatus())
-          && (maxReduceTasks > currentReduceTasks)) {
+          && (maxReduceSlots > currentReduceSlots)) {
         reduceScheduler.updateCollectionOfQSIs();
         tlr = reduceScheduler.assignTasks(taskTracker);
         if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
@@ -1133,10 +1220,33 @@
       i++;
     }
     qsi.numJobsByUser.put(job.getProfile().getUser(), i);
+    
+    // setup scheduler specific job information
+    preInitializeJob(job);
+    
     LOG.debug("Job " + job.getJobID().toString() + " is added under user " 
               + job.getProfile().getUser() + ", user now has " + i + " jobs");
   }
 
+  /**
+   * Setup {@link CapacityTaskScheduler} specific information prior to
+   * job initialization.
+   */
+  void preInitializeJob(JobInProgress job) {
+    JobConf jobConf = job.getJobConf();
+    
+    // Compute number of slots required to run a single map/reduce task
+    int slotsPerMap = 1;
+    int slotsPerReduce = 1;
+    if (memoryMatcher.isSchedulingBasedOnMemEnabled()) {
+      slotsPerMap = jobConf.computeNumSlotsPerMap(getMemSizeForMapSlot());
+     slotsPerReduce = 
+       jobConf.computeNumSlotsPerReduce(getMemSizeForReduceSlot());
+    }
+    job.setNumSlotsPerMap(slotsPerMap);
+    job.setNumSlotsPerReduce(slotsPerReduce);
+  }
+  
   // called when a job completes
   synchronized void jobCompleted(JobInProgress job) {
     QueueSchedulingInfo qsi = 

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java Wed Jun 24 14:22:13 2009
@@ -267,7 +267,8 @@
    * poller
    */
 
-  void init(Set<String> queues, CapacitySchedulerConf capacityConf) {
+  void init(Set<String> queues, 
+            CapacitySchedulerConf capacityConf) {
     for (String queue : queues) {
       int userlimit = capacityConf.getMinimumUserLimitPercent(queue);
       int maxUsersToInitialize = ((100 / userlimit) + MAX_ADDITIONAL_USERS_TO_INIT);

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java Wed Jun 24 14:22:13 2009
@@ -20,6 +20,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.TaskType;
 
 class MemoryMatcher {
 
@@ -54,55 +55,26 @@
    *          null if memory cannot be computed for some reason.
    */
   synchronized Long getMemReservedForTasks(
-      TaskTrackerStatus taskTracker, CapacityTaskScheduler.TYPE taskType) {
+      TaskTrackerStatus taskTracker, TaskType taskType) {
     long vmem = 0;
 
     for (TaskStatus task : taskTracker.getTaskReports()) {
       // the following task states are one in which the slot is
       // still occupied and hence memory of the task should be
       // accounted in used memory.
-      if ((task.getRunState() == TaskStatus.State.RUNNING)
-          || (task.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
-        JobInProgress job =
-            scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID());
-        if (job == null) {
-          // This scenario can happen if a job was completed/killed
-          // and retired from JT's memory. In this state, we can ignore
-          // the running task status and compute memory for the rest of
-          // the tasks. However, any scheduling done with this computation
-          // could result in over-subscribing of memory for tasks on this
-          // TT (as the unaccounted for task is still running).
-          // So, it is safer to not schedule anything for this TT
-          // One of the ways of doing that is to return null from here
-          // and check for null in the calling method.
-          LOG.info("Task tracker: " + taskTracker.getHost() + " is reporting "
-              + "a running / commit pending task: " + task.getTaskID()
-              + " but no corresponding job was found. "
-              + "Maybe job was retired. Not computing "
-              + "memory values for this TT.");
-          return null;
-        }
-
-        JobConf jConf = job.getJobConf();
-
-        // Get the memory "allotted" for this task by rounding off the job's
-        // tasks' memory limits to the nearest multiple of the slot-memory-size
-        // set on JT. This essentially translates to tasks of a high memory job
-        // using multiple slots.
+      if ((task.getRunState() == TaskStatus.State.RUNNING) ||
+          (task.getRunState() == TaskStatus.State.UNASSIGNED) ||
+          (task.inTaskCleanupPhase())) {
+        // Get the memory "allotted" for this task based on number of slots
         long myVmem = 0;
-        if (task.getIsMap() && taskType.equals(CapacityTaskScheduler.TYPE.MAP)) {
-          myVmem = jConf.getMemoryForMapTask();
-          myVmem =
-              (long) (scheduler.getMemSizeForMapSlot() * Math
-                  .ceil((float) myVmem
-                      / (float) scheduler.getMemSizeForMapSlot()));
+        if (task.getIsMap() && taskType == TaskType.MAP) {
+          long memSizePerMapSlot = scheduler.getMemSizeForMapSlot(); 
+          myVmem = 
+            memSizePerMapSlot * task.getNumSlots();
         } else if (!task.getIsMap()
-            && taskType.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
-          myVmem = jConf.getMemoryForReduceTask();
-          myVmem =
-              (long) (scheduler.getMemSizeForReduceSlot() * Math
-                  .ceil((float) myVmem
-                      / (float) scheduler.getMemSizeForReduceSlot()));
+            && taskType == TaskType.REDUCE) {
+          long memSizePerReduceSlot = scheduler.getMemSizeForReduceSlot(); 
+          myVmem = memSizePerReduceSlot * task.getNumSlots();
         }
         vmem += myVmem;
       }
@@ -118,8 +90,8 @@
    * @param taskTracker
    * @return true if this TT has enough memory for this job. False otherwise.
    */
-  boolean matchesMemoryRequirements(JobInProgress job,
-      CapacityTaskScheduler.TYPE taskType, TaskTrackerStatus taskTracker) {
+  boolean matchesMemoryRequirements(JobInProgress job,TaskType taskType, 
+                                    TaskTrackerStatus taskTracker) {
 
     LOG.debug("Matching memory requirements of " + job.getJobID().toString()
         + " for scheduling on " + taskTracker.trackerName);
@@ -131,44 +103,36 @@
     }
 
     Long memUsedOnTT = getMemReservedForTasks(taskTracker, taskType);
-    if (memUsedOnTT == null) {
-      // For some reason, maybe because we could not find the job
-      // corresponding to a running task (as can happen if the job
-      // is retired in between), we could not compute the memory state
-      // on this TT. Treat this as an error, and fail memory
-      // requirements.
-      LOG.info("Could not compute memory for taskTracker: "
-          + taskTracker.getHost() + ". Failing memory requirements.");
-      return false;
-    }
-
     long totalMemUsableOnTT = 0;
-
     long memForThisTask = 0;
-    if (taskType.equals(CapacityTaskScheduler.TYPE.MAP)) {
+    if (taskType == TaskType.MAP) {
       memForThisTask = job.getJobConf().getMemoryForMapTask();
       totalMemUsableOnTT =
-          scheduler.getMemSizeForMapSlot() * taskTracker.getMaxMapTasks();
-    } else if (taskType.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+          scheduler.getMemSizeForMapSlot() * taskTracker.getMaxMapSlots();
+    } else if (taskType == TaskType.REDUCE) {
       memForThisTask = job.getJobConf().getMemoryForReduceTask();
       totalMemUsableOnTT =
           scheduler.getMemSizeForReduceSlot()
-              * taskTracker.getMaxReduceTasks();
+              * taskTracker.getMaxReduceSlots();
     }
 
     long freeMemOnTT = totalMemUsableOnTT - memUsedOnTT.longValue();
     if (memForThisTask > freeMemOnTT) {
-      LOG.debug("memForThisTask (" + memForThisTask + ") > freeMemOnTT ("
-          + freeMemOnTT + "). A " + taskType + " task from "
-          + job.getJobID().toString() + " cannot be scheduled on TT "
-          + taskTracker.trackerName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("memForThisTask (" + memForThisTask + ") > freeMemOnTT ("
+                  + freeMemOnTT + "). A " + taskType + " task from "
+                  + job.getJobID().toString() + " cannot be scheduled on TT "
+                  + taskTracker.trackerName);
+      }
       return false;
     }
 
-    LOG.debug("memForThisTask = " + memForThisTask + ". freeMemOnTT = "
-        + freeMemOnTT + ". A " + taskType.toString() + " task from "
-        + job.getJobID().toString() + " matches memory requirements on TT "
-        + taskTracker.trackerName);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("memForThisTask = " + memForThisTask + ". freeMemOnTT = "
+                + freeMemOnTT + ". A " + taskType.toString() + " task from "
+                + job.getJobID().toString() + " matches memory requirements "
+                + "on TT "+ taskTracker.trackerName);
+    }
     return true;
   }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Wed Jun 24 14:22:13 2009
@@ -39,6 +39,7 @@
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.SecurityUtil.AccessControlList;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 public class TestCapacityScheduler extends TestCase {
 
@@ -199,7 +200,8 @@
         }
       }
       TaskAttemptID attemptId = getTaskAttemptID(true, areAllMapsRunning);
-      Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
+      Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), 
+                                super.numSlotsPerMap) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -241,7 +243,8 @@
         }
       }
       TaskAttemptID attemptId = getTaskAttemptID(false, areAllReducesRunning);
-      Task task = new ReduceTask("", attemptId, 0, 10) {
+      Task task = new ReduceTask("", attemptId, 0, 10, 
+                        super.numSlotsPerReduce) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -336,7 +339,7 @@
     
     FakeTaskInProgress(JobID jId, JobConf jobConf, Task t, 
         boolean isMap, FakeJobInProgress job) {
-      super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0);
+      super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0, 1);
       this.isMap = isMap;
       this.fakeJob = job;
       activeTasks = new TreeMap<TaskAttemptID, String>();
@@ -441,8 +444,8 @@
       new ArrayList<JobInProgressListener>();
     FakeQueueManager qm = new FakeQueueManager();
     
-    private Map<String, TaskTrackerStatus> trackers =
-      new HashMap<String, TaskTrackerStatus>();
+    private Map<String, TaskTracker> trackers =
+      new HashMap<String, TaskTracker>();
     private Map<String, TaskStatus> taskStatuses = 
       new HashMap<String, TaskStatus>();
     private Map<JobID, JobInProgress> jobs =
@@ -458,16 +461,22 @@
       this.maxReduceTasksPerTracker = maxReduceTasksPerTracker;
       for (int i = 1; i < numTaskTrackers + 1; i++) {
         String ttName = "tt" + i;
-        trackers.put(ttName, new TaskTrackerStatus(ttName, ttName + ".host", i,
-            new ArrayList<TaskStatus>(), 0, maxMapTasksPerTracker,
-            maxReduceTasksPerTracker));
+        TaskTracker tt = new TaskTracker(ttName);
+        tt.setStatus(new TaskTrackerStatus(ttName, ttName + ".host", i,
+                                           new ArrayList<TaskStatus>(), 0, 
+                                           maxMapTasksPerTracker,
+                                           maxReduceTasksPerTracker));
+        trackers.put(ttName, tt);
       }
     }
     
     public void addTaskTracker(String ttName) {
-      trackers.put(ttName, new TaskTrackerStatus(ttName, ttName + ".host", 1,
-          new ArrayList<TaskStatus>(), 0,
-          maxMapTasksPerTracker, maxReduceTasksPerTracker));
+      TaskTracker tt = new TaskTracker(ttName);
+      tt.setStatus(new TaskTrackerStatus(ttName, ttName + ".host", 1,
+                                         new ArrayList<TaskStatus>(), 0,
+                                         maxMapTasksPerTracker, 
+                                         maxReduceTasksPerTracker));
+      trackers.put(ttName, tt);
     }
     
     public ClusterStatus getClusterStatus() {
@@ -508,7 +517,11 @@
     }
 
     public Collection<TaskTrackerStatus> taskTrackers() {
-      return trackers.values();
+      List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>();
+      for (TaskTracker tt : trackers.values()) {
+        statuses.add(tt.getStatus());
+      }
+      return statuses;
     }
 
 
@@ -527,7 +540,7 @@
       }
     }
     
-    public TaskTrackerStatus getTaskTracker(String trackerID) {
+    public TaskTracker getTaskTracker(String trackerID) {
       return trackers.get(trackerID);
     }
     
@@ -547,10 +560,15 @@
         public boolean getIsMap() {
           return t.isMapTask();
         }
+        
+        @Override
+        public int getNumSlots() {
+          return t.getNumSlotsRequired();
+        }
       };
       taskStatuses.put(t.getTaskID().toString(), status);
       status.setRunState(TaskStatus.State.RUNNING);
-      trackers.get(taskTrackerName).getTaskReports().add(status);
+      trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);
     }
     
     public void finishTask(String taskTrackerName, String tipId, 
@@ -1718,14 +1736,14 @@
     // first, a map from j1 will run
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     // Total 2 map slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 100.0f);
+    checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
 
     // at this point, the scheduler tries to schedule another map from j1. 
     // there isn't enough space. The second job's reduce should be scheduled.
     checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
     // Total 1 reduce slot should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
         100.0f);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
   }
@@ -1774,17 +1792,19 @@
     // Fill the second tt with this job.
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
     // Total 1 map slot should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 25.0f);
+    checkOccupiedSlots("default", TaskType.MAP, 1, 1, 25.0f);
     assertEquals(String.format(
-        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 0, 0),
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
+        1, 1, 0, 0, 0, 0),
         (String) job1.getSchedulingInfo());
     checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
     // Total 1 map slot should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
         25.0f);
     assertEquals(String.format(
-        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 1, 1),
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
+        1, 1, 0, 1, 1, 0),
         (String) job1.getSchedulingInfo());
     checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
 
@@ -1801,18 +1821,20 @@
 
     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
     // Total 3 map slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f);
+    checkOccupiedSlots("default", TaskType.MAP, 1, 3, 75.0f);
     assertEquals(String.format(
-        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 0, 0),
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
+        1, 2, 0, 0, 0, 0),
         (String) job2.getSchedulingInfo());
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
 
     checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
     // Total 3 reduce slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3,
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 3,
         75.0f);
     assertEquals(String.format(
-        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2),
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
+        1, 2, 0, 1, 2, 0),
         (String) job2.getSchedulingInfo());
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
 
@@ -1832,16 +1854,24 @@
     assertNull(scheduler.assignTasks(tracker("tt2")));
     assertNull(scheduler.assignTasks(tracker("tt1")));
     assertNull(scheduler.assignTasks(tracker("tt2")));
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f);
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3,
-        75.0f);
+    // reserved tasktrackers contribute to occupied slots
+    // for maps, both tasktrackers are reserved.
+    checkOccupiedSlots("default", TaskType.MAP, 1, 7, 175.0f);
+    // for reduces, only one tasktracker is reserved, because
+    // the reduce scheduler is not visited for tt1 (as it has
+    // 0 slots free).
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 5,
+        125.0f);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
     checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
+    LOG.info(job2.getSchedulingInfo());
     assertEquals(String.format(
-        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2),
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
+        1, 2, 4, 1, 2, 2),
         (String) job2.getSchedulingInfo());
     assertEquals(String.format(
-        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 0, 0),
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
+        0, 0, 0, 0, 0, 0),
         (String) job3.getSchedulingInfo());
   }
 
@@ -1892,62 +1922,65 @@
     // 1st cycle - 1 map gets assigned.
     Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     // Total 1 map slot should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f);
+    checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50.0f);
     checkMemReservedForTasksOnTT("tt1",  512L, 0L);
 
     // 1st cycle of reduces - 1 reduce gets assigned.
     Task t1 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     // Total 1 reduce slot should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
         50.0f);
     checkMemReservedForTasksOnTT("tt1",  512L, 512L);
     
     // kill this job !
     taskTrackerManager.killJob(job1.getJobID());
     // No more map/reduce slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 0, 0, 0.0f);
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 0, 0,
+    checkOccupiedSlots("default", TaskType.MAP, 0, 0, 0.0f);
+    checkOccupiedSlots("default", TaskType.REDUCE, 0, 0,
         0.0f);
     
     // retire the job
     taskTrackerManager.removeJob(job1.getJobID());
     
     // submit another job.
-    LOG.debug("Submitting another normal job with 1 map and 1 reduce");
+    LOG.debug("Submitting another normal job with 2 maps and 2 reduces");
     jConf = new JobConf();
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(1);
+    jConf.setNumMapTasks(2);
+    jConf.setNumReduceTasks(2);
     jConf.setMemoryForMapTask(512);
     jConf.setMemoryForReduceTask(512);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
     
-    // 2nd cycle - nothing should get assigned. Memory matching code
-    // will see the job is missing and fail memory requirements.
+    // since with HADOOP-5964, we don't rely on a job conf to get
+    // the memory occupied, scheduling should be able to work correctly.
+    t1 = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50);
+    checkMemReservedForTasksOnTT("tt1", 1024L, 512L);
+
+    // assign a reduce now.
+    t1 = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50);
+    checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
+    
+    // now, no more can be assigned because all the slots are blocked.
     assertNull(scheduler.assignTasks(tracker("tt1")));
-    checkMemReservedForTasksOnTT("tt1", null, null);
 
-    // calling again should not make a difference, as the task is still running
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    checkMemReservedForTasksOnTT("tt1", null, null);
-    
     // finish the tasks on the tracker.
     taskTrackerManager.finishTask("tt1", t.getTaskID().toString(), job1);
     taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), job1);
-
+    
     // now a new task can be assigned.
-    t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    // Total 1 map slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f);
-    checkMemReservedForTasksOnTT("tt1", 512L, 0L);
-
+    t = checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+    checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
+    // memory used will change because of the finished task above.
+    checkMemReservedForTasksOnTT("tt1", 1024L, 512L);
+    
     // reduce can be assigned.
-    t = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
-    // Total 1 reduce slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
-        50.0f);
-    checkMemReservedForTasksOnTT("tt1", 512L, 512L);
+    t = checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 2, 100.0f);
+    checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
   }
 
   /*
@@ -2278,25 +2311,13 @@
   }
 
   /**
-   * Test case to test scheduling of
-   * <ol> 
-   * <li>High ram job with speculative map execution.
-   * <ul>
-   * <li>Submit one high ram job which has speculative map.</li>
-   * <li>Submit a normal job which has no speculative map.</li>
-   * <li>Scheduler should schedule first all map tasks from first job and block
-   * the cluster till both maps from first job get completed.
-   * </ul>
-   * </li>
-   * <li>High ram job with speculative reduce execution.
-   * <ul>
-   * <li>Submit one high ram job which has speculative reduce.</li>
-   * <li>Submit a normal job which has no speculative reduce.</li>
-   * <li>Scheduler should schedule first all reduce tasks from first job and
-   * block the cluster till both reduces are completed.</li>
-   * </ul>
-   * </li>
-   * </ol>
+   * Test case to test scheduling of jobs with speculative execution
+   * in the face of high RAM jobs.
+   * 
+   * Essentially, the test verifies that if a high RAM job has speculative
+   * tasks that cannot run because of memory requirements, we block
+   * that node and do not return any tasks to it.
+   * 
    * @throws IOException
    */
   public void testHighRamJobWithSpeculativeExecution() throws IOException {
@@ -2323,8 +2344,18 @@
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
+    // Submit a normal job that should occupy a node
+    JobConf jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(0);
+    jConf.setNumMapTasks(2);
+    jConf.setNumReduceTasks(0);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    FakeJobInProgress job1 = submitJob(JobStatus.PREP, jConf);
+    
     //Submit a high memory job with speculative tasks.
-    JobConf jConf = new JobConf();
+    jConf = new JobConf();
     jConf.setMemoryForMapTask(2 * 1024);
     jConf.setMemoryForReduceTask(0);
     jConf.setNumMapTasks(1);
@@ -2333,13 +2364,13 @@
     jConf.setUser("u1");
     jConf.setMapSpeculativeExecution(true);
     jConf.setReduceSpeculativeExecution(false);
-    FakeJobInProgress job1 =
+    FakeJobInProgress job2 =
         new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
             taskTrackerManager, "u1");
-    taskTrackerManager.submitJob(job1);
+    taskTrackerManager.submitJob(job2);
 
     //Submit normal job
-    jConf = new JobConf();
+    jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(1 * 1024);
     jConf.setMemoryForReduceTask(0);
     jConf.setNumMapTasks(1);
@@ -2348,127 +2379,46 @@
     jConf.setUser("u1");
     jConf.setMapSpeculativeExecution(false);
     jConf.setReduceSpeculativeExecution(false);
-    FakeJobInProgress job2 = submitJob(JobStatus.PREP, jConf);
+    FakeJobInProgress job3 = submitJob(JobStatus.PREP, jConf);
 
     controlledInitializationPoller.selectJobsToInitialize();
     raiseStatusChangeEvents(scheduler.jobQueuesManager);
 
-    // first, a map from j1 will run
-    // at this point, there is a speculative task for the same job to be
-    //scheduled. This task would be scheduled. Till the tasks from job1 gets
-    //complete none of the tasks from other jobs would be scheduled.
+    // Have one node on which all tasks of job1 are scheduled.
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
-    assertEquals("pending maps greater than zero " , job1.pendingMaps(), 0);
-    // Total 2 map slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
 
-    //make same tracker get back, check if you are blocking. Your job
-    //has speculative map task so tracker should be blocked even tho' it
-    //can run job2's map.
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    // Total 2 map slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+    // raise events to initialize the 3rd job
+    controlledInitializationPoller.selectJobsToInitialize();
+    raiseStatusChangeEvents(scheduler.jobQueuesManager);
 
-    //TT2 now gets speculative map of the job1
-    checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
-    // Total 4 map slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 4, 66.7f);
+    // On the second node, one task of the high RAM job can be scheduled.
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 0L);
-
-    // Now since the first job has no more speculative maps, it can schedule
-    // the second job.
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    // Total 5 map slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 5, 83.3f);
-    checkMemReservedForTasksOnTT("tt1", 3 * 1024L, 0L);
-
-    //finish everything
+    assertEquals("pending maps greater than zero " , job2.pendingMaps(), 0);
+    // Total 4 map slots should be accounted for.
+    checkOccupiedSlots("default", TaskType.MAP, 1, 4, 66.7f);
+    
+    // now when the first node gets back, it cannot run any task
+    // because job2 has a speculative task that can run on this node.
+    // This is even though job3's tasks can run on this node.
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    // Reservation will count for 2 more slots.
+    checkOccupiedSlots("default", TaskType.MAP, 1, 6, 100.0f);
+    
+    // finish one task from tt1.
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", 
         job1);
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_1", 
-        job1);
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", 
-        job2);
-    taskTrackerManager.finalizeJob(job1);
-    taskTrackerManager.finalizeJob(job2);
     
-    //Now submit high ram job with speculative reduce and check.
-    jConf = new JobConf();
-    jConf.setMemoryForMapTask(2 * 1024);
-    jConf.setMemoryForReduceTask(2 * 1024L);
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(1);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-    jConf.setMapSpeculativeExecution(false);
-    jConf.setReduceSpeculativeExecution(true);
-    FakeJobInProgress job3 =
-        new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
-            taskTrackerManager, "u1");
-    taskTrackerManager.submitJob(job3);
-
-    //Submit normal job w.r.t reduces
-    jConf = new JobConf();
-    jConf.setMemoryForMapTask(1 * 1024L);
-    jConf.setMemoryForReduceTask(1 * 1024L);
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(1);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-    jConf.setMapSpeculativeExecution(false);
-    jConf.setReduceSpeculativeExecution(false);
-    FakeJobInProgress job4 = submitJob(JobStatus.PREP, jConf);
+    // now, we can schedule the speculative task on tt1
+    checkAssignment("tt1", "attempt_test_0002_m_000001_1 on tt1");
     
-    controlledInitializationPoller.selectJobsToInitialize();
-    raiseStatusChangeEvents(scheduler.jobQueuesManager);
-
-    // Finish up the map scheduler
+    // finish one more task from tt1.
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", 
+        job1);
+    
+    // now the new job's tasks can be scheduled.
     checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
-    // Total 2 map slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
-
-    checkAssignment("tt2", "attempt_test_0004_m_000001_0 on tt2");
-    // Total 3 map slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 50.0f);
-    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
-
-    // first, a reduce from j3 will run
-    // at this point, there is a speculative task for the same job to be
-    //scheduled. This task would be scheduled. Till the tasks from job3 gets
-    //complete none of the tasks from other jobs would be scheduled.
-    checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
-    assertEquals("pending reduces greater than zero ", job3.pendingReduces(),
-        0);
-    // Total 2 reduce slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2,
-        33.3f);
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2*1024L);
-
-    //make same tracker get back, check if you are blocking. Your job
-    //has speculative reduce task so tracker should be blocked even tho' it
-    //can run job4's reduce.
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    // Total 2 reduce slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2,
-        33.3f);
-
-    //TT2 now gets speculative reduce of the job3
-    checkAssignment("tt2", "attempt_test_0003_r_000001_1 on tt2");
-    // Total 4 reduce slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 4,
-        66.7f);
-    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 2 * 1024L);
-
-    // Now since j3 has no more speculative reduces, it can schedule
-    // the j4.
-    checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
-    // Total 5 reduce slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 5,
-        83.3f);
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 3 * 1024L);
   }
 
   /**
@@ -2526,32 +2476,32 @@
     // Map 1 of high memory job
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkQueuesOrder(qs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+        .getOrderedQueues(TaskType.MAP));
 
     // Reduce 1 of high memory job
     checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     checkQueuesOrder(qs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+        .getOrderedQueues(TaskType.REDUCE));
 
     // Map 1 of normal job
     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
     checkQueuesOrder(reversedQs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+        .getOrderedQueues(TaskType.MAP));
 
     // Reduce 1 of normal job
     checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
     checkQueuesOrder(reversedQs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+        .getOrderedQueues(TaskType.REDUCE));
 
     // Map 2 of normal job
     checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
     checkQueuesOrder(reversedQs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+        .getOrderedQueues(TaskType.MAP));
 
     // Reduce 2 of normal job
     checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
     checkQueuesOrder(reversedQs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+        .getOrderedQueues(TaskType.REDUCE));
 
     // Now both the queues are equally served. But the comparator doesn't change
     // the order if queues are equally served.
@@ -2559,32 +2509,32 @@
     // Map 3 of normal job
     checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
     checkQueuesOrder(reversedQs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+        .getOrderedQueues(TaskType.MAP));
 
     // Reduce 3 of normal job
     checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
     checkQueuesOrder(reversedQs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+        .getOrderedQueues(TaskType.REDUCE));
 
     // Map 2 of high memory job
     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
     checkQueuesOrder(qs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+        .getOrderedQueues(TaskType.MAP));
 
     // Reduce 2 of high memory job
     checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
     checkQueuesOrder(qs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+        .getOrderedQueues(TaskType.REDUCE));
 
     // Map 4 of normal job
     checkAssignment("tt2", "attempt_test_0002_m_000004_0 on tt2");
     checkQueuesOrder(reversedQs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+        .getOrderedQueues(TaskType.MAP));
 
     // Reduce 4 of normal job
     checkAssignment("tt2", "attempt_test_0002_r_000004_0 on tt2");
     checkQueuesOrder(reversedQs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+        .getOrderedQueues(TaskType.REDUCE));
   }
 
   private void checkRunningJobMovementAndCompletion() throws IOException {
@@ -2733,7 +2683,7 @@
   }
 
   
-  protected TaskTrackerStatus tracker(String taskTrackerName) {
+  protected TaskTracker tracker(String taskTrackerName) {
     return taskTrackerManager.getTaskTracker(taskTrackerName);
   }
   
@@ -2757,11 +2707,11 @@
   private void checkMemReservedForTasksOnTT(String taskTracker,
       Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) {
     Long observedMemForMapsOnTT =
-        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
-            CapacityTaskScheduler.TYPE.MAP);
+        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker).getStatus(),
+            TaskType.MAP);
     Long observedMemForReducesOnTT =
-        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
-            CapacityTaskScheduler.TYPE.REDUCE);
+        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker).getStatus(),
+            TaskType.REDUCE);
     if (expectedMemForMapsOnTT == null) {
       assertTrue(observedMemForMapsOnTT == null);
     } else {
@@ -2785,7 +2735,7 @@
    * @return
    */
   private void checkOccupiedSlots(String queue,
-      CapacityTaskScheduler.TYPE type, int numActiveUsers,
+      TaskType type, int numActiveUsers,
       int expectedOccupiedSlots, float expectedOccupiedSlotsPercent) {
     scheduler.updateQSIInfoForTests();
     QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
@@ -2793,9 +2743,9 @@
         queueManager.getJobQueueInfo(queue).getSchedulingInfo();
     String[] infoStrings = schedulingInfo.split("\n");
     int index = -1;
-    if (type.equals(CapacityTaskScheduler.TYPE.MAP)) {
+    if (type.equals(TaskType.MAP)) {
       index = 7;
-    } else if (type.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+    } else if (type.equals(TaskType.REDUCE)) {
       index = (numActiveUsers == 0 ? 12 : 13 + numActiveUsers);
     }
     LOG.info(infoStrings[index]);

Modified: hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityScheduler.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityScheduler.java Wed Jun 24 14:22:13 2009
@@ -37,7 +37,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.http.HttpServer;
-
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 /**
  * A {@link TaskScheduler} that 
@@ -257,7 +257,7 @@
   }
 
   @Override
-  public List<Task> assignTasks(TaskTrackerStatus taskTracker)
+  public List<Task> assignTasks(TaskTracker taskTracker)
       throws IOException {
     return scheduler.assignTasks(taskTracker);  
   }

Modified: hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityScheduler.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityScheduler.java Wed Jun 24 14:22:13 2009
@@ -41,7 +41,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
-
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 /**
  * A {@link TaskScheduler} that 
@@ -338,8 +338,8 @@
       TaskTrackerStatus taskTracker, int numTrackers, boolean map)
       throws IOException {
     int taskOffset = assignedTasks.size();
-    int maxTasks = (map) ? taskTracker.getMaxMapTasks() : 
-        taskTracker.getMaxReduceTasks();
+    int maxTasks = (map) ? taskTracker.getMaxMapSlots() : 
+        taskTracker.getMaxReduceSlots();
     int countTasks =  (map) ? taskTracker.countMapTasks() : 
         taskTracker.countReduceTasks();
     int availableSlots = maxTasks - countTasks;
@@ -483,7 +483,7 @@
   }
 
   @Override
-  public List<Task> assignTasks(TaskTrackerStatus taskTracker)
+  public List<Task> assignTasks(TaskTracker taskTracker)
     throws IOException {
     long millis = 0;
     if (debug) {
@@ -494,8 +494,8 @@
  
     List<Task> assignedTasks = new ArrayList<Task>();
 
-    assignMapRedTasks(assignedTasks, taskTracker, numTrackers, MAP);
-    assignMapRedTasks(assignedTasks, taskTracker, numTrackers, REDUCE);
+    assignMapRedTasks(assignedTasks, taskTracker.getStatus(), numTrackers, MAP);
+    assignMapRedTasks(assignedTasks, taskTracker.getStatus(), numTrackers, REDUCE);
     if (debug) {
       long elapsed = System.currentTimeMillis() - millis;
       LOG.debug("assigned total tasks: " + 

Modified: hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/FakeDynamicScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/FakeDynamicScheduler.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/FakeDynamicScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/FakeDynamicScheduler.java Wed Jun 24 14:22:13 2009
@@ -22,6 +22,8 @@
 import java.util.List;
 import java.util.Collection;
 
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+
 /**
  * Mock queue scheduler for testing only
  */
@@ -30,7 +32,7 @@
   }
   public void terminate() throws IOException {
   }
-  public List<Task> assignTasks(TaskTrackerStatus taskTracker)
+  public List<Task> assignTasks(TaskTracker taskTracker)
     throws IOException {
     return null;
   }

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java Wed Jun 24 14:22:13 2009
@@ -40,13 +40,13 @@
   public boolean canAssignMap(TaskTrackerStatus tracker,
       int totalRunnableMaps, int totalMapSlots) {
     return tracker.countMapTasks() < getCap(totalRunnableMaps,
-        tracker.getMaxMapTasks(), totalMapSlots);
+        tracker.getMaxMapSlots(), totalMapSlots);
   }
 
   @Override
   public boolean canAssignReduce(TaskTrackerStatus tracker,
       int totalRunnableReduces, int totalReduceSlots) {
     return tracker.countReduceTasks() < getCap(totalRunnableReduces,
-        tracker.getMaxReduceTasks(), totalReduceSlots);
+        tracker.getMaxReduceSlots(), totalReduceSlots);
   }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Wed Jun 24 14:22:13 2009
@@ -38,6 +38,7 @@
 import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 /**
  * A {@link TaskScheduler} that implements fair sharing.
@@ -209,7 +210,7 @@
   }
   
   @Override
-  public synchronized List<Task> assignTasks(TaskTrackerStatus tracker)
+  public synchronized List<Task> assignTasks(TaskTracker tracker)
       throws IOException {
     if (!initialized) // Don't try to assign tasks if we haven't yet started up
       return null;
@@ -235,10 +236,11 @@
     // Scan to see whether any job needs to run a map, then a reduce
     ArrayList<Task> tasks = new ArrayList<Task>();
     TaskType[] types = new TaskType[] {TaskType.MAP, TaskType.REDUCE};
+    TaskTrackerStatus trackerStatus = tracker.getStatus();
     for (TaskType taskType: types) {
       boolean canAssign = (taskType == TaskType.MAP) ? 
-          loadMgr.canAssignMap(tracker, runnableMaps, totalMapSlots) :
-          loadMgr.canAssignReduce(tracker, runnableReduces, totalReduceSlots);
+          loadMgr.canAssignMap(trackerStatus, runnableMaps, totalMapSlots) :
+          loadMgr.canAssignReduce(trackerStatus, runnableReduces, totalReduceSlots);
       if (canAssign) {
         // Figure out the jobs that need this type of task
         List<JobInProgress> candidates = new ArrayList<JobInProgress>();
@@ -254,8 +256,8 @@
         Collections.sort(candidates, comparator);
         for (JobInProgress job: candidates) {
           Task task = (taskType == TaskType.MAP ? 
-              taskSelector.obtainNewMapTask(tracker, job) :
-              taskSelector.obtainNewReduceTask(tracker, job));
+              taskSelector.obtainNewMapTask(trackerStatus, job) :
+              taskSelector.obtainNewReduceTask(trackerStatus, job));
           if (task != null) {
             // Update the JobInfo for this job so we account for the launched
             // tasks during this update interval and don't try to launch more

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Wed Jun 24 14:22:13 2009
@@ -34,6 +34,7 @@
 import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.FairScheduler.JobInfo;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 
 public class TestFairScheduler extends TestCase {
@@ -69,7 +70,7 @@
     public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
         int ignored) throws IOException {
       TaskAttemptID attemptId = getTaskAttemptID(true);
-      Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
+      Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), 1) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -84,7 +85,7 @@
     public Task obtainNewReduceTask(final TaskTrackerStatus tts,
         int clusterSize, int ignored) throws IOException {
       TaskAttemptID attemptId = getTaskAttemptID(false);
-      Task task = new ReduceTask("", attemptId, 0, 10) {
+      Task task = new ReduceTask("", attemptId, 0, 10, 1) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -115,18 +116,26 @@
     List<JobInProgressListener> listeners =
       new ArrayList<JobInProgressListener>();
     
-    private Map<String, TaskTrackerStatus> trackers =
-      new HashMap<String, TaskTrackerStatus>();
+    private Map<String, TaskTracker> trackers =
+      new HashMap<String, TaskTracker>();
     private Map<String, TaskStatus> taskStatuses = 
       new HashMap<String, TaskStatus>();
 
     public FakeTaskTrackerManager() {
-      trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
-          new ArrayList<TaskStatus>(), 0,
-          maxMapTasksPerTracker, maxReduceTasksPerTracker));
-      trackers.put("tt2", new TaskTrackerStatus("tt2", "tt2.host", 2,
-          new ArrayList<TaskStatus>(), 0,
-          maxMapTasksPerTracker, maxReduceTasksPerTracker));
+      TaskTracker tt1 = new TaskTracker("tt1");
+      tt1.setStatus(new TaskTrackerStatus("tt1", "tt1.host", 1,
+                                          new ArrayList<TaskStatus>(), 0,
+                                          maxMapTasksPerTracker, 
+                                          maxReduceTasksPerTracker));
+      trackers.put("tt1", tt1);
+      
+      TaskTracker tt2 = new TaskTracker("tt2");
+      tt2.setStatus(new TaskTrackerStatus("tt2", "tt2.host", 2,
+                                          new ArrayList<TaskStatus>(), 0,
+                                          maxMapTasksPerTracker, 
+                                          maxReduceTasksPerTracker));
+      trackers.put("tt2", tt2);
+
     }
     
     @Override
@@ -152,7 +161,11 @@
 
     @Override
     public Collection<TaskTrackerStatus> taskTrackers() {
-      return trackers.values();
+      List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>();
+      for (TaskTracker tt : trackers.values()) {
+        statuses.add(tt.getStatus());
+      }
+      return statuses;
     }
 
 
@@ -189,7 +202,7 @@
       }
     }
     
-    public TaskTrackerStatus getTaskTracker(String trackerID) {
+    public TaskTracker getTaskTracker(String trackerID) {
       return trackers.get(trackerID);
     }
     
@@ -207,7 +220,7 @@
       };
       taskStatuses.put(t.getTaskID().toString(), status);
       status.setRunState(TaskStatus.State.RUNNING);
-      trackers.get(taskTrackerName).getTaskReports().add(status);
+      trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);
     }
     
     public void finishTask(String taskTrackerName, String tipId) {
@@ -1220,7 +1233,7 @@
     scheduler.update();
   }
 
-  protected TaskTrackerStatus tracker(String taskTrackerName) {
+  protected TaskTracker tracker(String taskTrackerName) {
     return taskTrackerManager.getTaskTracker(taskTrackerName);
   }
   

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Wed Jun 24 14:22:13 2009
@@ -62,8 +62,9 @@
    * Version 24: Changed format of Task and TaskStatus for HADOOP-4759 
    * Version 25: JobIDs are passed in response to JobTracker restart 
    * Version 26: Modified TaskID to be aware of the new TaskTypes
+   * Version 27: Added numRequiredSlots to TaskStatus for MAPREDUCE-516
    */
-  public static final long versionID = 26L;
+  public static final long versionID = 27L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Wed Jun 24 14:22:13 2009
@@ -185,7 +185,7 @@
     BytesWritable split = new BytesWritable();
     split.readFields(splitFile);
     splitFile.close();
-    Task task = new MapTask(jobFilename.toString(), taskId, partition, splitClass, split);
+    Task task = new MapTask(jobFilename.toString(), taskId, partition, splitClass, split, 1);
     task.setConf(conf);
     task.run(conf, new FakeUmbilical());
     return true;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Wed Jun 24 14:22:13 2009
@@ -1514,6 +1514,39 @@
     return val;
   }
 
+  /**
+   * Compute the number of slots required to run a single map task-attempt
+   * of this job.
+   * @param slotSizePerMap cluster-wide value of the amount of memory required
+   *                       to run a map-task
+   * @return the number of slots required to run a single map task-attempt
+   *          1 if memory parameters are disabled.
+   */
+  int computeNumSlotsPerMap(long slotSizePerMap) {
+    if ((slotSizePerMap==DISABLED_MEMORY_LIMIT) ||
+        (getMemoryForMapTask()==DISABLED_MEMORY_LIMIT)) {
+      return 1;
+    }
+    return (int)(Math.ceil((float)getMemoryForMapTask() / (float)slotSizePerMap));
+  }
+  
+  /**
+   * Compute the number of slots required to run a single reduce task-attempt
+   * of this job.
+   * @param slotSizePerReduce cluster-wide value of the amount of memory 
+   *                          required to run a reduce-task
+   * @return the number of slots required to run a single reduce task-attempt
+   *          1 if memory parameters are disabled
+   */
+  int computeNumSlotsPerReduce(long slotSizePerReduce) {
+    if ((slotSizePerReduce==DISABLED_MEMORY_LIMIT) ||
+        (getMemoryForReduceTask()==DISABLED_MEMORY_LIMIT)) {
+      return 1;
+    }
+    return 
+    (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce));
+  }
+  
   /** 
    * Find a jar that contains a class of the same name, if any.
    * It will return a jar file, even if that is not the first thing



Mime
View raw message