hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r788036 [2/3] - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapr...
Date Wed, 24 Jun 2009 14:22:15 GMT
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Jun 24 14:22:13 2009
@@ -51,6 +51,7 @@
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 /*************************************************************
  * JobInProgress maintains all the info for keeping
@@ -58,8 +59,10 @@
  * and its latest JobStatus, plus a set of tables for 
  * doing bookkeeping of its Tasks.
  * ***********************************************************
+ * 
+ * This is NOT a public interface!
  */
-class JobInProgress {
+public class JobInProgress {
   static final Log LOG = LogFactory.getLog(JobInProgress.class);
     
   JobProfile profile;
@@ -74,6 +77,8 @@
   TaskInProgress setup[] = new TaskInProgress[0];
   int numMapTasks = 0;
   int numReduceTasks = 0;
+  int numSlotsPerMap = 1;
+  int numSlotsPerReduce = 1;
   
   // Counters to track currently running/finished/failed Map/Reduce task-attempts
   int runningMapTasks = 0;
@@ -234,6 +239,37 @@
   //runningReduceStats used to maintain the RUNNING reduce tasks' statistics
   private DataStatistics runningReduceTaskStats = new DataStatistics();
  
+  private static class FallowSlotInfo {
+    long timestamp;
+    int numSlots;
+    
+    public FallowSlotInfo(long timestamp, int numSlots) {
+      this.timestamp = timestamp;
+      this.numSlots = numSlots;
+    }
+
+    public long getTimestamp() {
+      return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+      this.timestamp = timestamp;
+    }
+
+    public int getNumSlots() {
+      return numSlots;
+    }
+
+    public void setNumSlots(int numSlots) {
+      this.numSlots = numSlots;
+    }
+  }
+  
+  private Map<TaskTracker, FallowSlotInfo> trackersReservedForMaps = 
+    new HashMap<TaskTracker, FallowSlotInfo>();
+  private Map<TaskTracker, FallowSlotInfo> trackersReservedForReduces = 
+    new HashMap<TaskTracker, FallowSlotInfo>();
+  
   /**
    * Create an almost empty JobInProgress, which can be used only for tests
    */
@@ -445,6 +481,41 @@
   }
 
   /**
+   * Get the number of slots required to run a single map task-attempt.
+   * @return the number of slots required to run a single map task-attempt
+   */
+  synchronized int getNumSlotsPerMap() {
+    return numSlotsPerMap;
+  }
+
+  /**
+   * Set the number of slots required to run a single map task-attempt.
+   * This is typically set by schedulers which support high-ram jobs.
+   * @param slots the number of slots required to run a single map task-attempt
+   */
+  synchronized void setNumSlotsPerMap(int numSlotsPerMap) {
+    this.numSlotsPerMap = numSlotsPerMap;
+  }
+
+  /**
+   * Get the number of slots required to run a single reduce task-attempt.
+   * @return the number of slots required to run a single reduce task-attempt
+   */
+  synchronized int getNumSlotsPerReduce() {
+    return numSlotsPerReduce;
+  }
+
+  /**
+   * Set the number of slots required to run a single reduce task-attempt.
+   * This is typically set by schedulers which support high-ram jobs.
+   * @param slots the number of slots required to run a single reduce 
+   *              task-attempt
+   */
+  synchronized void setNumSlotsPerReduce(int numSlotsPerReduce) {
+    this.numSlotsPerReduce = numSlotsPerReduce;
+  }
+
+  /**
    * Construct the splits, etc.  This is invoked from an async
    * thread so that split-computation doesn't block anyone.
    */
@@ -501,7 +572,7 @@
       inputLength += splits[i].getDataLength();
       maps[i] = new TaskInProgress(jobId, jobFile, 
                                    splits[i], 
-                                   jobtracker, conf, this, i);
+                                   jobtracker, conf, this, i, numSlotsPerMap);
     }
     LOG.info("Input size for job " + jobId + " = " + inputLength
         + ". Number of splits = " + splits.length);
@@ -519,7 +590,7 @@
     for (int i = 0; i < numReduceTasks; i++) {
       reduces[i] = new TaskInProgress(jobId, jobFile, 
                                       numMapTasks, i, 
-                                      jobtracker, conf, this);
+                                      jobtracker, conf, this, numSlotsPerReduce);
       nonRunningReduces.add(reduces[i]);
     }
 
@@ -538,12 +609,12 @@
     // split.
     JobClient.RawSplit emptySplit = new JobClient.RawSplit();
     cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
-            jobtracker, conf, this, numMapTasks);
+            jobtracker, conf, this, numMapTasks, 1);
     cleanup[0].setJobCleanupTask();
 
     // cleanup reduce tip.
     cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
-                       numReduceTasks, jobtracker, conf, this);
+                       numReduceTasks, jobtracker, conf, this, 1);
     cleanup[1].setJobCleanupTask();
 
     // create two setup tips, one map and one reduce.
@@ -552,12 +623,12 @@
     // setup map tip. This map doesn't use any split. Just assign an empty
     // split.
     setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
-            jobtracker, conf, this, numMapTasks + 1 );
+            jobtracker, conf, this, numMapTasks + 1, 1);
     setup[0].setJobSetupTask();
 
     // setup reduce tip.
     setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
-                       numReduceTasks + 1, jobtracker, conf, this);
+                       numReduceTasks + 1, jobtracker, conf, this, 1);
     setup[1].setJobSetupTask();
     
     synchronized(jobInitKillStatus){
@@ -618,6 +689,15 @@
     return numReduceTasks - runningReduceTasks - failedReduceTIPs - 
     finishedReduceTasks + speculativeReduceTasks;
   }
+  public synchronized int getNumSlotsPerTask(TaskType taskType) {
+    if (taskType == TaskType.MAP) {
+      return numSlotsPerMap;
+    } else if (taskType == TaskType.REDUCE) {
+      return numSlotsPerReduce;
+    } else {
+      return 1;
+    }
+  }
   public JobPriority getPriority() {
     return this.priority;
   }
@@ -824,8 +904,10 @@
     if (change) {
       TaskStatus.State state = status.getRunState();
       // get the TaskTrackerStatus where the task ran 
-      TaskTrackerStatus ttStatus = 
+      TaskTracker taskTracker = 
         this.jobtracker.getTaskTracker(tip.machineWhereTaskRan(taskid));
+      TaskTrackerStatus ttStatus = 
+        (taskTracker == null) ? null : taskTracker.getStatus();
       String httpTaskLogLocation = null; 
 
       if (null != ttStatus){
@@ -887,7 +969,7 @@
         }
         
         // Tell the job to fail the relevant task
-        failedTask(tip, taskid, status, ttStatus,
+        failedTask(tip, taskid, status, taskTracker,
                    wasRunning, wasComplete);
 
         // Did the task failure lead to tip failure?
@@ -1424,9 +1506,9 @@
    * to the blacklist iff too many trackers in the cluster i.e. 
    * (clusterSize * CLUSTER_BLACKLIST_PERCENT) haven't turned 'flaky' already.
    * 
-   * @param trackerName task-tracker on which a task failed
+   * @param taskTracker task-tracker on which a task failed
    */
-  void addTrackerTaskFailure(String trackerName) {
+  void addTrackerTaskFailure(String trackerName, TaskTracker taskTracker) {
     if (flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) { 
       String trackerHostName = convertTrackerNameToHostName(trackerName);
 
@@ -1439,11 +1521,78 @@
       // Check if this tasktracker has turned 'flaky'
       if (trackerFailures.intValue() == conf.getMaxTaskFailuresPerTracker()) {
         ++flakyTaskTrackers;
+        
+        // Cancel reservations if appropriate
+        if (taskTracker != null) {
+          taskTracker.unreserveSlots(TaskType.MAP, this);
+          taskTracker.unreserveSlots(TaskType.REDUCE, this);
+        }
         LOG.info("TaskTracker at '" + trackerHostName + "' turned 'flaky'");
       }
     }
   }
+  
+  public synchronized void reserveTaskTracker(TaskTracker taskTracker,
+                                              TaskType type, int numSlots) {
+    Map<TaskTracker, FallowSlotInfo> map =
+      (type == TaskType.MAP) ? trackersReservedForMaps : trackersReservedForReduces;
+    
+    long now = System.currentTimeMillis();
+    
+    FallowSlotInfo info = map.get(taskTracker);
+    if (info == null) {
+      info = new FallowSlotInfo(now, numSlots);
+    } else {
+      // Increment metering info if the reservation is changing
+      if (info.getNumSlots() != numSlots) {
+        Enum<JobCounter> counter = 
+          (type == TaskType.MAP) ? 
+              JobCounter.FALLOW_SLOTS_MILLIS_MAPS : 
+              JobCounter.FALLOW_SLOTS_MILLIS_REDUCES;
+        long fallowSlotMillis = (now - info.getTimestamp()) * info.getNumSlots();
+        jobCounters.incrCounter(counter, fallowSlotMillis);
+        
+        // Update 
+        info.setTimestamp(now);
+        info.setNumSlots(numSlots);
+      }
+    }
+    map.put(taskTracker, info);
+  }
+  
+  public synchronized void unreserveTaskTracker(TaskTracker taskTracker,
+                                                TaskType type) {
+    Map<TaskTracker, FallowSlotInfo> map =
+      (type == TaskType.MAP) ? trackersReservedForMaps : 
+                               trackersReservedForReduces;
+
+    FallowSlotInfo info = map.get(taskTracker);
+    if (info == null) {
+      LOG.warn("Cannot find information about fallow slots for " + 
+               taskTracker.getTrackerName());
+      return;
+    }
     
+    long now = System.currentTimeMillis();
+
+    Enum<JobCounter> counter = 
+      (type == TaskType.MAP) ? 
+          JobCounter.FALLOW_SLOTS_MILLIS_MAPS : 
+          JobCounter.FALLOW_SLOTS_MILLIS_REDUCES;
+    long fallowSlotMillis = (now - info.getTimestamp()) * info.getNumSlots();
+    jobCounters.incrCounter(counter, fallowSlotMillis);
+
+    map.remove(taskTracker);
+  }
+  
+  public int getNumReservedTaskTrackersForMaps() {
+    return trackersReservedForMaps.size();
+  }
+  
+  public int getNumReservedTaskTrackersForReduces() {
+    return trackersReservedForReduces.size();
+  }
+  
   private int getTrackerTaskFailures(String trackerName) {
     String trackerHostName = convertTrackerNameToHostName(trackerName);
     Integer failedTasks = trackerToFailuresMap.get(trackerHostName);
@@ -2263,7 +2412,7 @@
 
     // Update jobhistory 
     TaskTrackerStatus ttStatus = 
-      this.jobtracker.getTaskTracker(status.getTaskTracker());
+      this.jobtracker.getTaskTrackerStatus(status.getTaskTracker());
     String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
     String taskType = getTaskType(tip);
     if (status.getIsMap()){
@@ -2421,6 +2570,7 @@
         this.status.setReduceProgress(1.0f);
       }
       this.finishTime = jobtracker.getClock().getTime();
+      cancelReservedSlots();
       LOG.info("Job " + this.status.getJobID() + 
                " has completed successfully.");
       JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime, 
@@ -2504,9 +2654,20 @@
       for (int i = 0; i < reduces.length; i++) {
         reduces[i].kill();
       }
+      
+      // Clear out reserved tasktrackers
+      cancelReservedSlots();
     }
   }
 
+  private void cancelReservedSlots() {
+    for (TaskTracker tt : trackersReservedForMaps.keySet()) {
+      tt.unreserveSlots(TaskType.MAP, this);
+    }
+    for (TaskTracker tt : trackersReservedForReduces.keySet()) {
+      tt.unreserveSlots(TaskType.REDUCE, this);
+    }
+  }
   private void clearUncleanTasks() {
     TaskAttemptID taskid = null;
     TaskInProgress tip = null;
@@ -2580,7 +2741,7 @@
    */
   private void failedTask(TaskInProgress tip, TaskAttemptID taskid, 
                           TaskStatus status, 
-                          TaskTrackerStatus taskTrackerStatus,
+                          TaskTracker taskTracker,
                           boolean wasRunning, boolean wasComplete) {
     final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
     // check if the TIP is already failed
@@ -2642,6 +2803,8 @@
     String taskTrackerName = taskStatus.getTaskTracker();
     String taskTrackerHostName = convertTrackerNameToHostName(taskTrackerName);
     int taskTrackerPort = -1;
+    TaskTrackerStatus taskTrackerStatus = 
+      (taskTracker == null) ? null : taskTracker.getStatus();
     if (taskTrackerStatus != null) {
       taskTrackerPort = taskTrackerStatus.getHttpPort();
     }
@@ -2687,7 +2850,7 @@
     // Note down that a task has failed on this tasktracker 
     //
     if (status.getRunState() == TaskStatus.State.FAILED) { 
-      addTrackerTaskFailure(taskTrackerName);
+      addTrackerTaskFailure(taskTrackerName, taskTracker);
     }
         
     //
@@ -2778,6 +2941,9 @@
     TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), 
                                                     taskid,
                                                     0.0f,
+                                                    tip.isMapTask() ? 
+                                                        numSlotsPerMap : 
+                                                        numSlotsPerReduce,
                                                     state,
                                                     reason,
                                                     reason,

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Wed Jun 24 14:22:13 2009
@@ -25,6 +25,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 /**
  * A {@link TaskScheduler} that keeps jobs in a queue in priority order (FIFO
@@ -77,9 +78,9 @@
   }
 
   @Override
-  public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
+  public synchronized List<Task> assignTasks(TaskTracker taskTracker)
       throws IOException {
-
+    TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); 
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
     final int numTaskTrackers = clusterStatus.getTaskTrackers();
     final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
@@ -91,10 +92,10 @@
     //
     // Get map + reduce counts for the current tracker.
     //
-    final int trackerMapCapacity = taskTracker.getMaxMapTasks();
-    final int trackerReduceCapacity = taskTracker.getMaxReduceTasks();
-    final int trackerRunningMaps = taskTracker.countMapTasks();
-    final int trackerRunningReduces = taskTracker.countReduceTasks();
+    final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots();
+    final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots();
+    final int trackerRunningMaps = taskTrackerStatus.countMapTasks();
+    final int trackerRunningReduces = taskTrackerStatus.countReduceTasks();
 
     // Assigned tasks
     List<Task> assignedTasks = new ArrayList<Task>();
@@ -167,7 +168,7 @@
           
           // Try to schedule a node-local or rack-local Map task
           t = 
-            job.obtainNewLocalMapTask(taskTracker, numTaskTrackers,
+            job.obtainNewLocalMapTask(taskTrackerStatus, numTaskTrackers,
                                       taskTrackerManager.getNumberOfUniqueHosts());
           if (t != null) {
             assignedTasks.add(t);
@@ -186,7 +187,7 @@
           
           // Try to schedule a node-local or rack-local Map task
           t = 
-            job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers,
+            job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,
                                    taskTrackerManager.getNumberOfUniqueHosts());
           
           if (t != null) {
@@ -224,7 +225,7 @@
           }
 
           Task t = 
-            job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
+            job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers, 
                                     taskTrackerManager.getNumberOfUniqueHosts()
                                     );
           if (t != null) {
@@ -243,7 +244,7 @@
     }
     
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Task assignments for " + taskTracker.getTrackerName() + " --> " +
+      LOG.debug("Task assignments for " + taskTrackerStatus.getTrackerName() + " --> " +
                 "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " + 
                 trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" + 
                 (trackerCurrentMapCapacity - trackerRunningMaps) + ", " +

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Jun 24 14:22:13 2009
@@ -85,6 +85,9 @@
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+
 /*******************************************************
  * JobTracker is the central location for submitting and 
  * tracking MR jobs in a network environment.
@@ -267,7 +270,8 @@
                     JobInProgress job = tip.getJob();
                     String trackerName = getAssignedTracker(taskId);
                     TaskTrackerStatus trackerStatus = 
-                      getTaskTracker(trackerName);
+                      getTaskTrackerStatus(trackerName); 
+                      
                     // This might happen when the tasktracker has already
                     // expired and this thread tries to call failedtask
                     // again. expire tasktracker should have called failed
@@ -348,22 +352,24 @@
                 long now = clock.getTime();
                 TaskTrackerStatus leastRecent = null;
                 while ((trackerExpiryQueue.size() > 0) &&
-                       ((leastRecent = trackerExpiryQueue.first()) != null) &&
-                       (now - leastRecent.getLastSeen() > tasktrackerExpiryInterval)) {
+                       (leastRecent = trackerExpiryQueue.first()) != null &&
+                       ((now - leastRecent.getLastSeen()) > tasktrackerExpiryInterval)) {
                         
                   // Remove profile from head of queue
                   trackerExpiryQueue.remove(leastRecent);
                   String trackerName = leastRecent.getTrackerName();
                         
                   // Figure out if last-seen time should be updated, or if tracker is dead
-                  TaskTrackerStatus newProfile = taskTrackers.get(leastRecent.getTrackerName());
+                  TaskTracker current = getTaskTracker(trackerName);
+                  TaskTrackerStatus newProfile = 
+                    (current == null ) ? null : current.getStatus();
                   // Items might leave the taskTracker set through other means; the
                   // status stored in 'taskTrackers' might be null, which means the
                   // tracker has already been destroyed.
                   if (newProfile != null) {
-                    if (now - newProfile.getLastSeen() > tasktrackerExpiryInterval) {
+                    if ((now - newProfile.getLastSeen()) > tasktrackerExpiryInterval) {
                       // Remove completely after marking the tasks as 'KILLED'
-                      lostTaskTracker(leastRecent.getTrackerName());
+                      lostTaskTracker(current);
                       // tracker is lost, and if it is blacklisted, remove 
                       // it from the count of blacklisted trackers in the cluster
                       if (isBlacklisted(trackerName)) {
@@ -373,7 +379,7 @@
                       
                       // remove the mapping from the hosts list
                       String hostname = newProfile.getHost();
-                      hostnameToTrackerName.get(hostname).remove(trackerName);
+                      hostnameToTaskTracker.get(hostname).remove(trackerName);
                     } else {
                       // Update time by inserting latest profile
                       trackerExpiryQueue.add(newProfile);
@@ -625,9 +631,9 @@
       synchronized (taskTrackers) {
         // remove the capacity of trackers on this host
         for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
-          int mapSlots = status.getMaxMapTasks();
+          int mapSlots = status.getMaxMapSlots();
           totalMapTaskCapacity -= mapSlots;
-          int reduceSlots = status.getMaxReduceTasks();
+          int reduceSlots = status.getMaxReduceSlots();
           totalReduceTaskCapacity -= reduceSlots;
           getInstrumentation().addBlackListedMapSlots(
               mapSlots);
@@ -644,9 +650,9 @@
         int numTrackersOnHost = 0;
         // add the capacity of trackers on the host
         for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
-          int mapSlots = status.getMaxMapTasks();
+          int mapSlots = status.getMaxMapSlots();
           totalMapTaskCapacity += mapSlots;
-          int reduceSlots = status.getMaxReduceTasks();
+          int reduceSlots = status.getMaxReduceSlots();
           totalReduceTaskCapacity += reduceSlots;
           numTrackersOnHost++;
           getInstrumentation().decBlackListedMapSlots(mapSlots);
@@ -694,7 +700,8 @@
   private List<TaskTrackerStatus> getStatusesOnHost(String hostName) {
     List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>();
     synchronized (taskTrackers) {
-      for (TaskTrackerStatus status : taskTrackers.values()) {
+      for (TaskTracker tt : taskTrackers.values()) {
+        TaskTrackerStatus status = tt.getStatus(); 
         if (hostName.equals(status.getHost())) {
           statuses.add(status);
         }
@@ -1015,14 +1022,14 @@
       // II. Create the (appropriate) task status
       if (type.equals(Values.MAP.name())) {
         taskStatus = 
-          new MapTaskStatus(attemptId, 0.0f, TaskStatus.State.RUNNING, 
-                            "", "", trackerName, TaskStatus.Phase.MAP, 
-                            new Counters());
+          new MapTaskStatus(attemptId, 0.0f, job.getNumSlotsPerTask(TaskType.MAP),
+                            TaskStatus.State.RUNNING, "", "", trackerName, 
+                            TaskStatus.Phase.MAP, new Counters());
       } else {
         taskStatus = 
-          new ReduceTaskStatus(attemptId, 0.0f, TaskStatus.State.RUNNING, 
-                               "", "", trackerName, TaskStatus.Phase.REDUCE, 
-                               new Counters());
+          new ReduceTaskStatus(attemptId, 0.0f, job.getNumSlotsPerTask(TaskType.REDUCE), 
+                               TaskStatus.State.RUNNING, "", "", trackerName, 
+                               TaskStatus.Phase.REDUCE, new Counters());
       }
 
       // Set the start time
@@ -1041,10 +1048,13 @@
         synchronized (taskTrackers) {
           synchronized (trackerExpiryQueue) {
             // IV. Register a new tracker
-            boolean isTrackerRegistered = getTaskTracker(trackerName) != null;
+            TaskTracker taskTracker = getTaskTracker(trackerName);
+            boolean isTrackerRegistered =  (taskTracker != null);
             if (!isTrackerRegistered) {
               markTracker(trackerName); // add the tracker to recovery-manager
-              addNewTracker(ttStatus);
+              taskTracker = new TaskTracker(trackerName);
+              taskTracker.setStatus(ttStatus);
+              addNewTracker(taskTracker);
             }
       
             // V. Update the tracker status
@@ -1398,17 +1408,17 @@
         // are updated
         int size = trackerExpiryQueue.size();
         for (int i = 0; i < size ; ++i) {
-          // Get the first status
-          TaskTrackerStatus status = trackerExpiryQueue.first();
+          // Get the first tasktracker
+          TaskTrackerStatus taskTracker = trackerExpiryQueue.first();
 
           // Remove it
-          trackerExpiryQueue.remove(status);
+          trackerExpiryQueue.remove(taskTracker);
 
           // Set the new time
-          status.setLastSeen(recoveryProcessEndTime);
+          taskTracker.setLastSeen(recoveryProcessEndTime);
 
           // Add back to get the sorted list
-          trackerExpiryQueue.add(status);
+          trackerExpiryQueue.add(taskTracker);
         }
       }
 
@@ -1493,11 +1503,11 @@
   Map<String, Node> hostnameToNodeMap = 
     Collections.synchronizedMap(new TreeMap<String, Node>());
   
-  // (hostname --> Set(trackername))
+  // (hostname --> Set(tasktracker))
   // This is used to keep track of all trackers running on one host. While
   // decommissioning the host, all the trackers on the host will be lost.
-  Map<String, Set<String>> hostnameToTrackerName = 
-    Collections.synchronizedMap(new TreeMap<String, Set<String>>());
+  Map<String, Set<TaskTracker>> hostnameToTaskTracker = 
+    Collections.synchronizedMap(new TreeMap<String, Set<TaskTracker>>());
   
   // Number of resolved entries
   int numResolved;
@@ -1510,8 +1520,8 @@
   //
   int totalMaps = 0;
   int totalReduces = 0;
-  private HashMap<String, TaskTrackerStatus> taskTrackers =
-    new HashMap<String, TaskTrackerStatus>();
+  private HashMap<String, TaskTracker> taskTrackers =
+    new HashMap<String, TaskTracker>();
   Map<String,Integer>uniqueHostsMap = new ConcurrentHashMap<String, Integer>();
   ExpireTrackers expireTrackers = new ExpireTrackers();
   Thread expireTrackersThread = null;
@@ -1526,7 +1536,7 @@
   RecoveryManager recoveryManager;
 
   /**
-   * It might seem like a bug to maintain a TreeSet of status objects,
+   * It might seem like a bug to maintain a TreeSet of tasktracker objects,
    * which can be updated at any time.  But that's not what happens!  We
    * only update status objects in the taskTrackers table.  Status objects
    * are never updated once they enter the expiry queue.  Instead, we wait
@@ -2326,9 +2336,15 @@
    * @return {@link Collection} of {@link TaskTrackerStatus} 
    */
   public Collection<TaskTrackerStatus> taskTrackers() {
+    Collection<TaskTrackerStatus> ttStatuses;
     synchronized (taskTrackers) {
-      return taskTrackers.values();
+      ttStatuses = 
+        new ArrayList<TaskTrackerStatus>(taskTrackers.values().size());
+      for (TaskTracker tt : taskTrackers.values()) {
+        ttStatuses.add(tt.getStatus());
+      }
     }
+    return ttStatuses;
   }
   
   /**
@@ -2340,7 +2356,8 @@
     Collection<TaskTrackerStatus> activeTrackers = 
       new ArrayList<TaskTrackerStatus>();
     synchronized (taskTrackers) {
-      for (TaskTrackerStatus status : taskTrackers.values()) {
+      for ( TaskTracker tt : taskTrackers.values()) {
+        TaskTrackerStatus status = tt.getStatus();
         if (!faultyTrackers.isBlacklisted(status.getHost())) {
           activeTrackers.add(status);
         }
@@ -2361,7 +2378,8 @@
     List<String> blacklistedTrackers = 
       new ArrayList<String>();
     synchronized (taskTrackers) {
-      for (TaskTrackerStatus status : taskTrackers.values()) {
+      for (TaskTracker tt : taskTrackers.values()) {
+        TaskTrackerStatus status = tt.getStatus();
         if (!faultyTrackers.isBlacklisted(status.getHost())) {
           activeTrackers.add(status.getTrackerName());
         } else {
@@ -2384,7 +2402,8 @@
     Collection<TaskTrackerStatus> blacklistedTrackers = 
       new ArrayList<TaskTrackerStatus>();
     synchronized (taskTrackers) {
-      for (TaskTrackerStatus status : taskTrackers.values()) {
+      for (TaskTracker tt : taskTrackers.values()) {
+        TaskTrackerStatus status = tt.getStatus(); 
         if (faultyTrackers.isBlacklisted(status.getHost())) {
           blacklistedTrackers.add(status);
         }
@@ -2414,14 +2433,22 @@
    * @return true if blacklisted, false otherwise
    */
   public boolean isBlacklisted(String trackerID) {
-    TaskTrackerStatus status = getTaskTracker(trackerID);
+    TaskTrackerStatus status = getTaskTrackerStatus(trackerID);
     if (status != null) {
       return faultyTrackers.isBlacklisted(status.getHost());
     }
     return false;
   }
   
-  public TaskTrackerStatus getTaskTracker(String trackerID) {
+  public TaskTrackerStatus getTaskTrackerStatus(String trackerID) {
+    TaskTracker taskTracker;
+    synchronized (taskTrackers) {
+      taskTracker = taskTrackers.get(trackerID);
+    }
+    return (taskTracker == null) ? null : taskTracker.getStatus();
+  }
+
+  public TaskTracker getTaskTracker(String trackerID) {
     synchronized (taskTrackers) {
       return taskTrackers.get(trackerID);
     }
@@ -2435,7 +2462,8 @@
    * 
    * @param status Task Tracker's status
    */
-  private void addNewTracker(TaskTrackerStatus status) {
+  private void addNewTracker(TaskTracker taskTracker) {
+    TaskTrackerStatus status = taskTracker.getStatus();
     trackerExpiryQueue.add(status);
 
     //  Register the tracker if its not registered
@@ -2446,14 +2474,14 @@
     }
 
     // add it to the set of tracker per host
-    Set<String> trackers = hostnameToTrackerName.get(hostname);
+    Set<TaskTracker> trackers = hostnameToTaskTracker.get(hostname);
     if (trackers == null) {
-      trackers = Collections.synchronizedSet(new HashSet<String>());
-      hostnameToTrackerName.put(hostname, trackers);
+      trackers = Collections.synchronizedSet(new HashSet<TaskTracker>());
+      hostnameToTaskTracker.put(hostname, trackers);
     }
     LOG.info("Adding tracker " + status.getTrackerName() + " to host " 
              + hostname);
-    trackers.add(status.getTrackerName());
+    trackers.add(taskTracker);
   }
 
   public Node resolveAndAddToTopology(String name) {
@@ -2565,11 +2593,13 @@
                                                   boolean acceptNewTasks, 
                                                   short responseId) 
     throws IOException {
-    LOG.debug("Got heartbeat from: " + status.getTrackerName() + 
-              " (restarted: " + restarted + 
-              " initialContact: " + initialContact + 
-              " acceptNewTasks: " + acceptNewTasks + ")" +
-              " with responseId: " + responseId);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Got heartbeat from: " + status.getTrackerName() + 
+                " (restarted: " + restarted + 
+                " initialContact: " + initialContact + 
+                " acceptNewTasks: " + acceptNewTasks + ")" +
+                " with responseId: " + responseId);
+    }
 
     // Make sure heartbeat is from a tasktracker allowed by the jobtracker.
     if (!acceptTaskTracker(status)) {
@@ -2645,13 +2675,13 @@
       
     // Check for new tasks to be executed on the tasktracker
     if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
-      TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
+      TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName) ;
       if (taskTrackerStatus == null) {
         LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
       } else {
         List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
         if (tasks == null ) {
-          tasks = taskScheduler.assignTasks(taskTrackerStatus);
+          tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
         }
         if (tasks != null) {
           for (Task task : tasks) {
@@ -2751,14 +2781,15 @@
    */
   private boolean updateTaskTrackerStatus(String trackerName,
                                           TaskTrackerStatus status) {
-    TaskTrackerStatus oldStatus = taskTrackers.get(trackerName);
+    TaskTracker tt = getTaskTracker(trackerName);
+    TaskTrackerStatus oldStatus = (tt == null) ? null : tt.getStatus();
     if (oldStatus != null) {
       totalMaps -= oldStatus.countMapTasks();
       totalReduces -= oldStatus.countReduceTasks();
       if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
-        int mapSlots = oldStatus.getMaxMapTasks();
+        int mapSlots = oldStatus.getMaxMapSlots();
         totalMapTaskCapacity -= mapSlots;
-        int reduceSlots = oldStatus.getMaxReduceTasks();
+        int reduceSlots = oldStatus.getMaxReduceSlots();
         totalReduceTaskCapacity -= reduceSlots;
       }
       if (status == null) {
@@ -2778,16 +2809,56 @@
       totalMaps += status.countMapTasks();
       totalReduces += status.countReduceTasks();
       if (!faultyTrackers.isBlacklisted(status.getHost())) {
-        int mapSlots = status.getMaxMapTasks();
+        int mapSlots = status.getMaxMapSlots();
         totalMapTaskCapacity += mapSlots;
-        int reduceSlots = status.getMaxReduceTasks();
+        int reduceSlots = status.getMaxReduceSlots();
         totalReduceTaskCapacity += reduceSlots;
       }
       boolean alreadyPresent = false;
-      if (taskTrackers.containsKey(trackerName)) {
+      TaskTracker taskTracker = taskTrackers.get(trackerName);
+      if (taskTracker != null) {
         alreadyPresent = true;
+      } else {
+        taskTracker = new TaskTracker(trackerName);
+      }
+      
+      taskTracker.setStatus(status);
+      taskTrackers.put(trackerName, taskTracker);
+      
+      if (LOG.isDebugEnabled()) {
+        int runningMaps = 0, runningReduces = 0;
+        int commitPendingMaps = 0, commitPendingReduces = 0;
+        int unassignedMaps = 0, unassignedReduces = 0;
+        int miscMaps = 0, miscReduces = 0;
+        List<TaskStatus> taskReports = status.getTaskReports();
+        for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
+          TaskStatus ts = (TaskStatus) it.next();
+          boolean isMap = ts.getIsMap();
+          TaskStatus.State state = ts.getRunState();
+          if (state == TaskStatus.State.RUNNING) {
+            if (isMap) { ++runningMaps; }
+            else { ++runningReduces; }
+          } else if (state == TaskStatus.State.UNASSIGNED) {
+            if (isMap) { ++unassignedMaps; }
+            else { ++unassignedReduces; }
+          } else if (state == TaskStatus.State.COMMIT_PENDING) {
+            if (isMap) { ++commitPendingMaps; }
+            else { ++commitPendingReduces; }
+          } else {
+            if (isMap) { ++miscMaps; } 
+            else { ++miscReduces; } 
+          }
+        }
+        LOG.debug(trackerName + ": Status -" +
+                  " running(m) = " + runningMaps + 
+                  " unassigned(m) = " + unassignedMaps + 
+                  " commit_pending(m) = " + commitPendingMaps +
+                  " misc(m) = " + miscMaps +
+                  " running(r) = " + runningReduces + 
+                  " unassigned(r) = " + unassignedReduces + 
+                  " commit_pending(r) = " + commitPendingReduces +
+                  " misc(r) = " + miscReduces); 
       }
-      taskTrackers.put(trackerName, status);
 
       if (!alreadyPresent)  {
         Integer numTaskTrackersInHost = 
@@ -2816,11 +2887,12 @@
       synchronized (trackerExpiryQueue) {
         boolean seenBefore = updateTaskTrackerStatus(trackerName,
                                                      trackerStatus);
+        TaskTracker taskTracker = getTaskTracker(trackerName);
         if (initialContact) {
           // If it's first contact, then clear out 
           // any state hanging around
           if (seenBefore) {
-            lostTaskTracker(trackerName);
+            lostTaskTracker(taskTracker);
           }
         } else {
           // If not first contact, there should be some record of the tracker
@@ -2837,7 +2909,7 @@
           if (isBlacklisted(trackerName)) {
             faultyTrackers.numBlacklistedTrackers += 1;
           }
-          addNewTracker(trackerStatus);
+          addNewTracker(taskTracker);
         }
       }
     }
@@ -2956,8 +3028,8 @@
   // returns cleanup tasks first, then setup tasks.
   private synchronized List<Task> getSetupAndCleanupTasks(
     TaskTrackerStatus taskTracker) throws IOException {
-    int maxMapTasks = taskTracker.getMaxMapTasks();
-    int maxReduceTasks = taskTracker.getMaxReduceTasks();
+    int maxMapTasks = taskTracker.getMaxMapSlots();
+    int maxReduceTasks = taskTracker.getMaxReduceSlots();
     int numMaps = taskTracker.countMapTasks();
     int numReduces = taskTracker.countReduceTasks();
     int numTaskTrackers = getClusterStatus().getTaskTrackers();
@@ -3635,7 +3707,8 @@
    * already been updated.  Just process the contained tasks and any
    * jobs that might be affected.
    */
-  void lostTaskTracker(String trackerName) {
+  void lostTaskTracker(TaskTracker taskTracker) {
+    String trackerName = taskTracker.getTrackerName();
     LOG.info("Lost tracker '" + trackerName + "'");
     
     // remove the tracker from the local structures
@@ -3693,10 +3766,14 @@
       
       // Penalize this tracker for each of the jobs which   
       // had any tasks running on it when it was 'lost' 
+      // Also, remove any reserved slots on this tasktracker
       for (JobInProgress job : jobsWithFailures) {
-        job.addTrackerTaskFailure(trackerName);
+        job.addTrackerTaskFailure(trackerName, taskTracker);
       }
-      
+
+      // Cleanup
+      taskTracker.lost();
+
       // Purge 'marked' tasks, needs to be done  
       // here to prevent hanging references!
       removeMarkedTasks(trackerName);
@@ -3726,9 +3803,9 @@
     hostsReader.refresh();
     
     Set<String> excludeSet = new HashSet<String>();
-    for(Map.Entry<String, TaskTrackerStatus> eSet : taskTrackers.entrySet()) {
+    for(Map.Entry<String, TaskTracker> eSet : taskTrackers.entrySet()) {
       String trackerName = eSet.getKey();
-      TaskTrackerStatus status = eSet.getValue();
+      TaskTrackerStatus status = eSet.getValue().getStatus();
       // Check if not include i.e not in host list or in hosts list but excluded
       if (!inHostsList(status) || inExcludedHostsList(status)) {
           excludeSet.add(status.getHost()); // add to rejected trackers
@@ -3746,12 +3823,13 @@
       synchronized (trackerExpiryQueue) {
         for (String host : hosts) {
           LOG.info("Decommissioning host " + host);
-          Set<String> trackers = hostnameToTrackerName.remove(host);
+          Set<TaskTracker> trackers = hostnameToTaskTracker.remove(host);
           if (trackers != null) {
-            for (String tracker : trackers) {
-              LOG.info("Losing tracker " + tracker + " on host " + host);
+            for (TaskTracker tracker : trackers) {
+              LOG.info("Decommission: Losing tracker " + tracker + 
+                       " on host " + host);
               lostTaskTracker(tracker); // lose the tracker
-              updateTaskTrackerStatus(tracker, null);
+              updateTaskTrackerStatus(tracker.getStatus().getTrackerName(), null);
             }
           }
           LOG.info("Host " + host + " is ready for decommissioning");

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java Wed Jun 24 14:22:13 2009
@@ -26,6 +26,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 /**
  * A {@link TaskScheduler} that limits the maximum number of tasks
@@ -69,9 +70,9 @@
   }
 
   @Override
-  public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
+  public synchronized List<Task> assignTasks(TaskTracker taskTracker)
       throws IOException {
-
+    TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
     final int numTaskTrackers =
         taskTrackerManager.getClusterStatus().getTaskTrackers();
     Collection<JobInProgress> jobQueue =
@@ -79,10 +80,10 @@
     Task task;
 
     /* Stats about the current taskTracker */
-    final int mapTasksNumber = taskTracker.countMapTasks();
-    final int reduceTasksNumber = taskTracker.countReduceTasks();
-    final int maximumMapTasksNumber = taskTracker.getMaxMapTasks();
-    final int maximumReduceTasksNumber = taskTracker.getMaxReduceTasks();
+    final int mapTasksNumber = taskTrackerStatus.countMapTasks();
+    final int reduceTasksNumber = taskTrackerStatus.countReduceTasks();
+    final int maximumMapTasksNumber = taskTrackerStatus.getMaxMapSlots();
+    final int maximumReduceTasksNumber = taskTrackerStatus.getMaxReduceSlots();
 
     /*
      * Statistics about the whole cluster. Most are approximate because of
@@ -141,11 +142,11 @@
             continue;
           }
           if (step == 0 || step == 2) {
-            task = job.obtainNewMapTask(taskTracker, numTaskTrackers,
+            task = job.obtainNewMapTask(taskTrackerStatus, numTaskTrackers,
                 taskTrackerManager.getNumberOfUniqueHosts());
           }
           else {
-            task = job.obtainNewReduceTask(taskTracker, numTaskTrackers,
+            task = job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers,
                 taskTrackerManager.getNumberOfUniqueHosts());
           }
           if (task != null) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Wed Jun 24 14:22:13 2009
@@ -168,7 +168,7 @@
             MapTask map = new MapTask(file.toString(),  
                                       mapId, i,
                                       rawSplits[i].getClassName(),
-                                      rawSplits[i].getBytes());
+                                      rawSplits[i].getBytes(), 1);
             JobConf localConf = new JobConf(job);
             map.setJobFile(localFile.toString());
             map.localizeConfiguration(localConf);
@@ -207,7 +207,7 @@
             }
             if (!this.isInterrupted()) {
               ReduceTask reduce = new ReduceTask(file.toString(), 
-                                                 reduceId, 0, mapIds.size());
+                                                 reduceId, 0, mapIds.size(), 1);
               JobConf localConf = new JobConf(job);
               reduce.setJobFile(localFile.toString());
               reduce.localizeConfiguration(localConf);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Wed Jun 24 14:22:13 2009
@@ -88,9 +88,9 @@
   }
 
   public MapTask(String jobFile, TaskAttemptID taskId, 
-                 int partition, String splitClass, BytesWritable split
-                 ) {
-    super(jobFile, taskId, partition);
+                 int partition, String splitClass, BytesWritable split,
+                 int numSlotsRequired) {
+    super(jobFile, taskId, partition, numSlotsRequired);
     this.splitClass = splitClass;
     this.split = split;
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java Wed Jun 24 14:22:13 2009
@@ -30,10 +30,10 @@
   
   public MapTaskStatus() {}
 
-  public MapTaskStatus(TaskAttemptID taskid, float progress,
+  public MapTaskStatus(TaskAttemptID taskid, float progress, int numSlots,
           State runState, String diagnosticInfo, String stateString,
           String taskTracker, Phase phase, Counters counters) {
-    super(taskid, progress, runState, diagnosticInfo, stateString,
+    super(taskid, progress, numSlots, runState, diagnosticInfo, stateString,
           taskTracker, phase, counters);
   }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Jun 24 14:22:13 2009
@@ -149,8 +149,8 @@
   }
 
   public ReduceTask(String jobFile, TaskAttemptID taskId,
-                    int partition, int numMaps) {
-    super(jobFile, taskId, partition);
+                    int partition, int numMaps, int numSlotsRequired) {
+    super(jobFile, taskId, partition, numSlotsRequired);
     this.numMaps = numMaps;
   }
   

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java Wed Jun 24 14:22:13 2009
@@ -34,11 +34,11 @@
   
   public ReduceTaskStatus() {}
 
-  public ReduceTaskStatus(TaskAttemptID taskid, float progress, State runState,
-          String diagnosticInfo, String stateString, String taskTracker,
-          Phase phase, Counters counters) {
-    super(taskid, progress, runState, diagnosticInfo, stateString, taskTracker,
-            phase, counters);
+  public ReduceTaskStatus(TaskAttemptID taskid, float progress, int numSlots,
+                          State runState, String diagnosticInfo, String stateString, 
+                          String taskTracker, Phase phase, Counters counters) {
+    super(taskid, progress, numSlots, runState, diagnosticInfo, stateString, 
+          taskTracker, phase, counters);
   }
 
   @Override

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed Jun 24 14:22:13 2009
@@ -52,8 +52,12 @@
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
-/** Base class for tasks. */
-abstract class Task implements Writable, Configurable {
+/** 
+ * Base class for tasks.
+ * 
+ * This is NOT a public interface.
+ */
+abstract public class Task implements Writable, Configurable {
   private static final Log LOG =
     LogFactory.getLog(Task.class);
 
@@ -119,6 +123,7 @@
   protected org.apache.hadoop.mapreduce.OutputFormat<?,?> outputFormat;
   protected org.apache.hadoop.mapreduce.OutputCommitter committer;
   protected final Counters.Counter spilledRecordsCounter;
+  private int numSlotsRequired;
 
   ////////////////////////////////////////////
   // Constructors
@@ -130,13 +135,15 @@
     spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
   }
 
-  public Task(String jobFile, TaskAttemptID taskId, int partition) {
+  public Task(String jobFile, TaskAttemptID taskId, int partition, 
+              int numSlotsRequired) {
     this.jobFile = jobFile;
     this.taskId = taskId;
      
     this.partition = partition;
+    this.numSlotsRequired = numSlotsRequired;
     this.taskStatus = TaskStatus.createTaskStatus(isMapTask(), this.taskId, 
-                                                  0.0f, 
+                                                  0.0f, numSlotsRequired, 
                                                   TaskStatus.State.UNASSIGNED, 
                                                   "", "", "", 
                                                   isMapTask() ? 
@@ -153,6 +160,10 @@
   public void setJobFile(String jobFile) { this.jobFile = jobFile; }
   public String getJobFile() { return jobFile; }
   public TaskAttemptID getTaskID() { return taskId; }
+  public int getNumSlotsRequired() {
+    return numSlotsRequired;
+  }
+
   Counters getCounters() { return counters; }
   
   /**
@@ -173,14 +184,14 @@
   /**
    * Return current phase of the task. 
    * needs to be synchronized as communication thread sends the phase every second
-   * @return
+   * @return the curent phase of the task
    */
   public synchronized TaskStatus.Phase getPhase(){
     return this.taskStatus.getPhase(); 
   }
   /**
    * Set current phase of the task. 
-   * @param p
+   * @param phase task phase 
    */
   protected synchronized void setPhase(TaskStatus.Phase phase){
     this.taskStatus.setPhase(phase); 
@@ -282,6 +293,7 @@
     Text.writeString(out, jobFile);
     taskId.write(out);
     out.writeInt(partition);
+    out.writeInt(numSlotsRequired);
     taskStatus.write(out);
     skipRanges.write(out);
     out.writeBoolean(skipping);
@@ -295,6 +307,7 @@
     jobFile = Text.readString(in);
     taskId = TaskAttemptID.read(in);
     partition = in.readInt();
+    numSlotsRequired = in.readInt();
     taskStatus.readFields(in);
     this.mapOutputFile.setJobId(taskId.getJobID()); 
     skipRanges.readFields(in);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Jun 24 14:22:13 2009
@@ -68,6 +68,7 @@
   private JobTracker jobtracker;
   private TaskID id;
   private JobInProgress job;
+  private final int numSlotsRequired;
 
   // Status of the TIP
   private int successEventNumber = -1;
@@ -134,7 +135,8 @@
   public TaskInProgress(JobID jobid, String jobFile, 
                         RawSplit rawSplit, 
                         JobTracker jobtracker, JobConf conf, 
-                        JobInProgress job, int partition) {
+                        JobInProgress job, int partition,
+                        int numSlotsRequired) {
     this.jobFile = jobFile;
     this.rawSplit = rawSplit;
     this.jobtracker = jobtracker;
@@ -142,6 +144,7 @@
     this.conf = conf;
     this.partition = partition;
     this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);
+    this.numSlotsRequired = numSlotsRequired;
     setMaxTaskAttempts();
     init(jobid);
   }
@@ -152,7 +155,7 @@
   public TaskInProgress(JobID jobid, String jobFile, 
                         int numMaps, 
                         int partition, JobTracker jobtracker, JobConf conf,
-                        JobInProgress job) {
+                        JobInProgress job, int numSlotsRequired) {
     this.jobFile = jobFile;
     this.numMaps = numMaps;
     this.partition = partition;
@@ -160,6 +163,7 @@
     this.job = job;
     this.conf = conf;
     this.maxSkipRecords = SkipBadRecords.getReducerMaxSkipGroups(conf);
+    this.numSlotsRequired = numSlotsRequired;
     setMaxTaskAttempts();
     init(jobid);
   }
@@ -538,7 +542,9 @@
            newState != TaskStatus.State.UNASSIGNED) && 
           (oldState == newState)) {
         LOG.warn("Recieved duplicate status update of '" + newState + 
-                 "' for '" + taskid + "' of TIP '" + getTIPId() + "'");
+                 "' for '" + taskid + "' of TIP '" + getTIPId() + "'" +
+                 "oldTT=" + oldStatus.getTaskTracker() + 
+                 " while newTT=" + status.getTaskTracker());
         return false;
       }
 
@@ -968,9 +974,10 @@
       } else {
         split = new BytesWritable();
       }
-      t = new MapTask(jobFile, taskid, partition, splitClass, split);
+      t = new MapTask(jobFile, taskid, partition, splitClass, split,
+                      numSlotsRequired);
     } else {
-      t = new ReduceTask(jobFile, taskid, partition, numMaps);
+      t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsRequired);
     }
     if (jobCleanup) {
       t.setJobCleanupTask();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskScheduler.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskScheduler.java Wed Jun 24 14:22:13 2009
@@ -23,6 +23,7 @@
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 /**
  * Used by a {@link JobTracker} to schedule {@link Task}s on
@@ -36,7 +37,7 @@
  * between the job being added (when
  * {@link JobInProgressListener#jobAdded(JobInProgress)} is called)
  * and tasks for that job being assigned (by
- * {@link #assignTasks(TaskTrackerStatus)}).
+ * {@link #assignTasks(TaskTracker)}).
  * @see EagerTaskInitializationListener
  */
 abstract class TaskScheduler implements Configurable {
@@ -80,8 +81,8 @@
    * @param taskTracker The TaskTracker for which we're looking for tasks.
    * @return A list of tasks to run on that TaskTracker, possibly empty.
    */
-  public abstract List<Task> assignTasks(TaskTrackerStatus taskTracker)
-    throws IOException;
+  public abstract List<Task> assignTasks(TaskTracker taskTracker)
+  throws IOException;
 
   /**
    * Returns a collection of jobs in an order which is specific to 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Wed Jun 24 14:22:13 2009
@@ -49,6 +49,7 @@
   private String diagnosticInfo;
   private String stateString;
   private String taskTracker;
+  private int numSlots;
     
   private long startTime; //in ms
   private long finishTime; 
@@ -61,14 +62,16 @@
 
   public TaskStatus() {
     taskid = new TaskAttemptID();
+    numSlots = 0;
   }
 
-  public TaskStatus(TaskAttemptID taskid, float progress,
+  public TaskStatus(TaskAttemptID taskid, float progress, int numSlots,
                     State runState, String diagnosticInfo,
                     String stateString, String taskTracker,
                     Phase phase, Counters counters) {
     this.taskid = taskid;
     this.progress = progress;
+    this.numSlots = numSlots;
     this.runState = runState;
     this.diagnosticInfo = diagnosticInfo;
     this.stateString = stateString;
@@ -80,6 +83,10 @@
   
   public TaskAttemptID getTaskID() { return taskid; }
   public abstract boolean getIsMap();
+  public int getNumSlots() {
+    return numSlots;
+  }
+
   public float getProgress() { return progress; }
   public void setProgress(float progress) {
     this.progress = progress;
@@ -383,6 +390,7 @@
   public void write(DataOutput out) throws IOException {
     taskid.write(out);
     out.writeFloat(progress);
+    out.writeInt(numSlots);
     WritableUtils.writeEnum(out, runState);
     Text.writeString(out, diagnosticInfo);
     Text.writeString(out, stateString);
@@ -400,6 +408,7 @@
   public void readFields(DataInput in) throws IOException {
     this.taskid.readFields(in);
     setProgress(in.readFloat());
+    this.numSlots = in.readInt();
     this.runState = WritableUtils.readEnum(in, State.class);
     this.diagnosticInfo = Text.readString(in);
     this.stateString = Text.readString(in);
@@ -419,24 +428,27 @@
   // Factory-like methods to create/read/write appropriate TaskStatus objects
   //////////////////////////////////////////////////////////////////////////////
   
-  static TaskStatus createTaskStatus(DataInput in, TaskAttemptID taskId, float progress,
+  static TaskStatus createTaskStatus(DataInput in, TaskAttemptID taskId, 
+                                     float progress, int numSlots,
                                      State runState, String diagnosticInfo,
                                      String stateString, String taskTracker,
                                      Phase phase, Counters counters) 
   throws IOException {
     boolean isMap = in.readBoolean();
-    return createTaskStatus(isMap, taskId, progress, runState, diagnosticInfo, 
-                          stateString, taskTracker, phase, counters);
+    return createTaskStatus(isMap, taskId, progress, numSlots, runState, 
+                            diagnosticInfo, stateString, taskTracker, phase, 
+                            counters);
   }
   
-  static TaskStatus createTaskStatus(boolean isMap, TaskAttemptID taskId, float progress,
-                                   State runState, String diagnosticInfo,
-                                   String stateString, String taskTracker,
-                                   Phase phase, Counters counters) { 
-    return (isMap) ? new MapTaskStatus(taskId, progress, runState, 
+  static TaskStatus createTaskStatus(boolean isMap, TaskAttemptID taskId, 
+                                     float progress, int numSlots,
+                                     State runState, String diagnosticInfo,
+                                     String stateString, String taskTracker,
+                                     Phase phase, Counters counters) { 
+    return (isMap) ? new MapTaskStatus(taskId, progress, numSlots, runState, 
                                        diagnosticInfo, stateString, taskTracker, 
                                        phase, counters) :
-                     new ReduceTaskStatus(taskId, progress, runState, 
+                     new ReduceTaskStatus(taskId, progress, numSlots, runState, 
                                           diagnosticInfo, stateString, 
                                           taskTracker, phase, counters);
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Jun 24 14:22:13 2009
@@ -67,6 +67,7 @@
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
 import org.apache.hadoop.mapred.pipes.Submitter;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsException;
 import org.apache.hadoop.metrics.MetricsRecord;
@@ -189,8 +190,8 @@
   private static final String OUTPUT = "output";
   private JobConf fConf;
   private FileSystem localFs;
-  private int maxCurrentMapTasks;
-  private int maxCurrentReduceTasks;
+  private int maxMapSlots;
+  private int maxReduceSlots;
   private int failures;
   private MapEventsFetcherThread mapEventsFetcher;
   int workerThreads;
@@ -486,8 +487,8 @@
     }
     
     // RPC initialization
-    int max = maxCurrentMapTasks > maxCurrentReduceTasks ? 
-                       maxCurrentMapTasks : maxCurrentReduceTasks;
+    int max = maxMapSlots > maxReduceSlots ? 
+                       maxMapSlots : maxReduceSlots;
     //set the num handlers to max*2 since canCommit may wait for the duration
     //of a heartbeat RPC
     this.taskReportServer =
@@ -525,8 +526,8 @@
 
     this.indexCache = new IndexCache(this.fConf);
 
-    mapLauncher = new TaskLauncher(maxCurrentMapTasks);
-    reduceLauncher = new TaskLauncher(maxCurrentReduceTasks);
+    mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);
+    reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
     mapLauncher.start();
     reduceLauncher.start();
     Class<? extends TaskController> taskControllerClass 
@@ -902,10 +903,8 @@
    */
   public TaskTracker(JobConf conf) throws IOException {
     fConf = conf;
-    maxCurrentMapTasks = conf.getInt(
-                  "mapred.tasktracker.map.tasks.maximum", 2);
-    maxCurrentReduceTasks = conf.getInt(
-                  "mapred.tasktracker.reduce.tasks.maximum", 2);
+    maxMapSlots = conf.getInt("mapred.tasktracker.map.tasks.maximum", 2);
+    maxReduceSlots = conf.getInt("mapred.tasktracker.reduce.tasks.maximum", 2);
     this.jobTrackAddr = JobTracker.getAddress(conf);
     InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(
         conf.get("mapred.task.tracker.http.address", "0.0.0.0:50060"));
@@ -1174,8 +1173,8 @@
                                        cloneAndResetRunningTaskStatuses(
                                          sendCounters), 
                                        failures, 
-                                       maxCurrentMapTasks,
-                                       maxCurrentReduceTasks); 
+                                       maxMapSlots,
+                                       maxReduceSlots); 
       }
     } else {
       LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
@@ -1188,9 +1187,10 @@
     boolean askForNewTask;
     long localMinSpaceStart;
     synchronized (this) {
-      askForNewTask = (status.countMapTasks() < maxCurrentMapTasks || 
-                       status.countReduceTasks() < maxCurrentReduceTasks) &&
-                      acceptNewTasks; 
+      askForNewTask = 
+        ((status.countOccupiedMapSlots() < maxMapSlots || 
+          status.countOccupiedReduceSlots() < maxReduceSlots) && 
+         acceptNewTasks); 
       localMinSpaceStart = minSpaceStart;
     }
     if (askForNewTask) {
@@ -1564,12 +1564,12 @@
     private final int maxSlots;
     private List<TaskInProgress> tasksToLaunch;
 
-    public TaskLauncher(int numSlots) {
+    public TaskLauncher(TaskType taskType, int numSlots) {
       this.maxSlots = numSlots;
       this.numFreeSlots = new IntWritable(numSlots);
       this.tasksToLaunch = new LinkedList<TaskInProgress>();
       setDaemon(true);
-      setName("TaskLauncher for task");
+      setName("TaskLauncher for " + taskType + " tasks");
     }
 
     public void addToTaskQueue(LaunchTaskAction action) {
@@ -1584,9 +1584,9 @@
       tasksToLaunch.clear();
     }
     
-    public void addFreeSlot() {
+    public void addFreeSlots(int numSlots) {
       synchronized (numFreeSlots) {
-        numFreeSlots.set(numFreeSlots.get() + 1);
+        numFreeSlots.set(numFreeSlots.get() + numSlots);
         assert (numFreeSlots.get() <= maxSlots);
         LOG.info("addFreeSlot : current free slots : " + numFreeSlots.get());
         numFreeSlots.notifyAll();
@@ -1597,22 +1597,29 @@
       while (!Thread.interrupted()) {
         try {
           TaskInProgress tip;
+          Task task;
           synchronized (tasksToLaunch) {
             while (tasksToLaunch.isEmpty()) {
               tasksToLaunch.wait();
             }
             //get the TIP
             tip = tasksToLaunch.remove(0);
-            LOG.info("Trying to launch : " + tip.getTask().getTaskID());
+            task = tip.getTask();
+            LOG.info("Trying to launch : " + tip.getTask().getTaskID() + 
+                     " which needs " + task.getNumSlotsRequired() + " slots");
           }
-          //wait for a slot to run
+          //wait for free slots to run
           synchronized (numFreeSlots) {
-            while (numFreeSlots.get() == 0) {
+            while (numFreeSlots.get() < task.getNumSlotsRequired()) {
+              LOG.info("TaskLauncher : Waiting for " + task.getNumSlotsRequired() + 
+                       " to launch " + task.getTaskID() + ", currently we have " + 
+                       numFreeSlots.get() + " free slots");
               numFreeSlots.wait();
             }
             LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+
-                " and trying to launch "+tip.getTask().getTaskID());
-            numFreeSlots.set(numFreeSlots.get() - 1);
+                     " and trying to launch "+tip.getTask().getTaskID() + 
+                     " which needs " + task.getNumSlotsRequired() + " slots");
+            numFreeSlots.set(numFreeSlots.get() - task.getNumSlotsRequired());
             assert (numFreeSlots.get() >= 0);
           }
           synchronized (tip) {
@@ -1621,7 +1628,7 @@
                 tip.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
                 tip.getRunState() != TaskStatus.State.KILLED_UNCLEAN) {
               //got killed externally while still in the launcher queue
-              addFreeSlot();
+              addFreeSlots(task.getNumSlotsRequired());
               continue;
             }
             tip.slotTaken = true;
@@ -1786,6 +1793,7 @@
       localJobConf = null;
       taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(), 
                                                0.0f, 
+                                               task.getNumSlotsRequired(),
                                                task.getState(),
                                                diagnosticInfo.toString(), 
                                                "initializing",  
@@ -2347,7 +2355,7 @@
     private synchronized void releaseSlot() {
       if (slotTaken) {
         if (launcher != null) {
-          launcher.addFreeSlot();
+          launcher.addFreeSlots(task.getNumSlotsRequired());
         }
         slotTaken = false;
       }
@@ -2979,11 +2987,11 @@
   }
 
   int getMaxCurrentMapTasks() {
-    return maxCurrentMapTasks;
+    return maxMapSlots;
   }
   
   int getMaxCurrentReduceTasks() {
-    return maxCurrentReduceTasks;
+    return maxReduceSlots;
   }
 
   /**
@@ -3047,7 +3055,7 @@
             JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
             JobConf.DISABLED_MEMORY_LIMIT);
     totalMemoryAllottedForTasks =
-        maxCurrentMapTasks * mapSlotMemorySizeOnTT + maxCurrentReduceTasks
+        maxMapSlots * mapSlotMemorySizeOnTT + maxReduceSlots
             * reduceSlotSizeMemoryOnTT;
     if (totalMemoryAllottedForTasks < 0) {
       totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java Wed Jun 24 14:22:13 2009
@@ -17,7 +17,10 @@
  */
 package org.apache.hadoop.mapred;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.TaskStatus.State;
 
 import java.io.*;
 import java.util.*;
@@ -28,9 +31,11 @@
  * of the most recent TaskTrackerStatus objects for each
  * unique TaskTracker it knows about.
  *
+ * This is NOT a public interface!
  **************************************************/
-class TaskTrackerStatus implements Writable {
-
+public class TaskTrackerStatus implements Writable {
+  public static final Log LOG = LogFactory.getLog(TaskTrackerStatus.class);
+  
   static {                                        // register a ctor
     WritableFactories.setFactory
       (TaskTrackerStatus.class,
@@ -253,19 +258,27 @@
   public List<TaskStatus> getTaskReports() {
     return taskReports;
   }
-    
+   
   /**
-   * Return the current MapTask count
+   * Is the given task considered as 'running' ?
+   * @param taskStatus
+   * @return
+   */
+  private boolean isTaskRunning(TaskStatus taskStatus) {
+    TaskStatus.State state = taskStatus.getRunState();
+    return (state == State.RUNNING || state == State.UNASSIGNED || 
+            taskStatus.inTaskCleanupPhase());
+  }
+  
+  /**
+   * Get the number of running map tasks.
+   * @return the number of running map tasks
    */
   public int countMapTasks() {
     int mapCount = 0;
-    for (Iterator it = taskReports.iterator(); it.hasNext();) {
+    for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
       TaskStatus ts = (TaskStatus) it.next();
-      TaskStatus.State state = ts.getRunState();
-      if (ts.getIsMap() &&
-          ((state == TaskStatus.State.RUNNING) ||
-           (state == TaskStatus.State.UNASSIGNED) ||
-           ts.inTaskCleanupPhase())) {
+      if (ts.getIsMap() && isTaskRunning(ts)) {
         mapCount++;
       }
     }
@@ -273,17 +286,37 @@
   }
 
   /**
-   * Return the current ReduceTask count
+   * Get the number of occupied map slots.
+   * @return the number of occupied map slots
+   */
+  public int countOccupiedMapSlots() {
+    int mapSlotsCount = 0;
+    for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
+      TaskStatus ts = (TaskStatus) it.next();
+      if (ts.getIsMap() && isTaskRunning(ts)) {
+        mapSlotsCount += ts.getNumSlots();
+      }
+    }
+    return mapSlotsCount;
+  }
+  
+  /**
+   * Get available map slots.
+   * @return available map slots
+   */
+  public int getAvailableMapSlots() {
+    return getMaxMapSlots() - countOccupiedMapSlots();
+  }
+  
+  /**
+   * Get the number of running reduce tasks.
+   * @return the number of running reduce tasks
    */
   public int countReduceTasks() {
     int reduceCount = 0;
-    for (Iterator it = taskReports.iterator(); it.hasNext();) {
+    for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
       TaskStatus ts = (TaskStatus) it.next();
-      TaskStatus.State state = ts.getRunState();
-      if ((!ts.getIsMap()) &&
-          ((state == TaskStatus.State.RUNNING) ||  
-           (state == TaskStatus.State.UNASSIGNED) ||
-           ts.inTaskCleanupPhase())) {
+      if ((!ts.getIsMap()) && isTaskRunning(ts)) {
         reduceCount++;
       }
     }
@@ -291,6 +324,30 @@
   }
 
   /**
+   * Get the number of occupied reduce slots.
+   * @return the number of occupied reduce slots
+   */
+  public int countOccupiedReduceSlots() {
+    int reduceSlotsCount = 0;
+    for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
+      TaskStatus ts = (TaskStatus) it.next();
+      if ((!ts.getIsMap()) && isTaskRunning(ts)) {
+        reduceSlotsCount += ts.getNumSlots();
+      }
+    }
+    return reduceSlotsCount;
+  }
+  
+  /**
+   * Get available reduce slots.
+   * @return available reduce slots
+   */
+  public int getAvailableReduceSlots() {
+    return getMaxReduceSlots() - countOccupiedReduceSlots();
+  }
+  
+
+  /**
    */
   public long getLastSeen() {
     return lastSeen;
@@ -302,15 +359,18 @@
   }
 
   /**
-   * Get the maximum concurrent tasks for this node.  (This applies
-   * per type of task - a node with maxTasks==1 will run up to 1 map
-   * and 1 reduce concurrently).
-   * @return maximum tasks this node supports
+   * Get the maximum map slots for this node.
+   * @return the maximum map slots for this node
    */
-  public int getMaxMapTasks() {
+  public int getMaxMapSlots() {
     return maxMapTasks;
   }
-  public int getMaxReduceTasks() {
+  
+  /**
+   * Get the maximum reduce slots for this node.
+   * @return the maximum reduce slots for this node
+   */
+  public int getMaxReduceSlots() {
     return maxReduceTasks;
   }  
   

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Wed Jun 24 14:22:13 2009
@@ -55,9 +55,10 @@
    * Version 15 Adds FAILED_UNCLEAN and KILLED_UNCLEAN states for HADOOP-4759
    * Version 16 Change in signature of getTask() for HADOOP-5488
    * Version 17 Modified TaskID to be aware of the new TaskTypes
+   * Version 18 Added numRequiredSlots to TaskStatus for MAPREDUCE-516
    * */
 
-  public static final long versionID = 17L;
+  public static final long versionID = 18L;
   
   /**
    * Called when a child task process starts, to get its task.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.java Wed Jun 24 14:22:13 2009
@@ -26,5 +26,7 @@
   TOTAL_LAUNCHED_REDUCES,
   OTHER_LOCAL_MAPS,
   DATA_LOCAL_MAPS,
-  RACK_LOCAL_MAPS
+  RACK_LOCAL_MAPS,
+  FALLOW_SLOTS_MILLIS_MAPS,
+  FALLOW_SLOTS_MILLIS_REDUCES
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.properties?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.properties (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.properties Wed Jun 24 14:22:13 2009
@@ -1,12 +1,13 @@
 # ResourceBundle properties file for job-level counters
 
-CounterGroupName=              Job Counters 
-
-NUM_FAILED_MAPS.name=          Failed map tasks
-NUM_FAILED_REDUCES.name=       Failed reduce tasks
-TOTAL_LAUNCHED_MAPS.name=      Launched map tasks
-TOTAL_LAUNCHED_REDUCES.name=   Launched reduce tasks
-OTHER_LOCAL_MAPS.name=         Other local map tasks
-DATA_LOCAL_MAPS.name=          Data-local map tasks
-RACK_LOCAL_MAPS.name=          Rack-local map tasks
+CounterGroupName=                  Job Counters 
 
+NUM_FAILED_MAPS.name=              Failed map tasks
+NUM_FAILED_REDUCES.name=           Failed reduce tasks
+TOTAL_LAUNCHED_MAPS.name=          Launched map tasks
+TOTAL_LAUNCHED_REDUCES.name=       Launched reduce tasks
+OTHER_LOCAL_MAPS.name=             Other local map tasks
+DATA_LOCAL_MAPS.name=              Data-local map tasks
+RACK_LOCAL_MAPS.name=              Rack-local map tasks
+FALLOW_SLOTS_MILLIS_MAPS.name=     Total time spent by all maps waiting after reserving slots (ms)
+FALLOW_SLOTS_MILLIS_REDUCES.name=  Total time spent by all reduces waiting after reserving slots (ms)

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java?rev=788036&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java Wed Jun 24 14:22:13 2009
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.jobtracker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobInProgress;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * The representation of a single <code>TaskTracker</code> as seen by 
+ * the {@link JobTracker}.
+ */
+public class TaskTracker {
+  static final Log LOG = LogFactory.getLog(TaskTracker.class);
+  
+  final private String trackerName;
+  private TaskTrackerStatus status;
+
+  private JobInProgress jobForFallowMapSlot;
+  private JobInProgress jobForFallowReduceSlot;
+  
+  /**
+   * Create a new {@link TaskTracker}.
+   * @param trackerName Unique identifier for the <code>TaskTracker</code>
+   */
+  public TaskTracker(String trackerName) {
+    this.trackerName = trackerName;
+  }
+
+  /**
+   * Get the unique identifier for the {@link TaskTracker}
+   * @return the unique identifier for the <code>TaskTracker</code>
+   */
+  public String getTrackerName() {
+    return trackerName;
+  }
+
+  /**
+   * Get the current {@link TaskTrackerStatus} of the <code>TaskTracker</code>.
+   * @return the current <code>TaskTrackerStatus</code> of the 
+   *         <code>TaskTracker</code>
+   */
+  public TaskTrackerStatus getStatus() {
+    return status;
+  }
+
+  /**
+   * Set the current {@link TaskTrackerStatus} of the <code>TaskTracker</code>.
+   * @param status the current <code>TaskTrackerStatus</code> of the 
+   *               <code>TaskTracker</code>
+   */
+  public void setStatus(TaskTrackerStatus status) {
+    this.status = status;
+  }
+
+  /**
+   * Get the number of currently available slots on this tasktracker for the 
+   * given type of the task.
+   * @param taskType the {@link TaskType} to check for number of available slots 
+   * @return the number of currently available slots for the given 
+   *         <code>taskType</code>
+   */
+  public int getAvailableSlots(TaskType taskType) {
+    int availableSlots = 0;
+    if (taskType == TaskType.MAP) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(trackerName + " getAvailSlots:" +
+        		     " max(m)=" + status.getMaxMapSlots() + 
+        		     " occupied(m)=" + status.countOccupiedMapSlots());
+      }
+      availableSlots = status.getAvailableMapSlots();
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(trackerName + " getAvailSlots:" +
+                  " max(r)=" + status.getMaxReduceSlots() + 
+                  " occupied(r)=" + status.countOccupiedReduceSlots());
+      }
+      availableSlots = status.getAvailableReduceSlots();
+    }
+    return availableSlots;
+  }
+  
+  /**
+   * Get the {@link JobInProgress} for which the fallow slot(s) are held.
+   * @param taskType {@link TaskType} of the task
+   * @return the task for which the fallow slot(s) are held, 
+   *         <code>null</code> if there are no fallow slots
+   */
+  public JobInProgress getJobForFallowSlot(TaskType taskType) {
+    return 
+      (taskType == TaskType.MAP) ? jobForFallowMapSlot : jobForFallowReduceSlot;
+  }
+
+  /**
+   * Reserve specified number of slots for a given <code>job</code>.
+   * @param taskType {@link TaskType} of the task
+   * @param job the job for which slots on this <code>TaskTracker</code>
+   *             are to be reserved
+   * @param numSlots number of slots to be reserved
+   */
+  public void reserveSlots(TaskType taskType, JobInProgress job, int numSlots) {
+    JobID jobId = job.getJobID();
+    if (taskType == TaskType.MAP) {
+      if (jobForFallowMapSlot != null && 
+          !jobForFallowMapSlot.getJobID().equals(jobId)) {
+        throw new RuntimeException(trackerName + " already has " + 
+                                   "slots reserved for " + 
+                                   jobForFallowMapSlot + "; being"  +
+                                   " asked to reserve " + numSlots + " for " + 
+                                   jobId);
+      }
+
+      jobForFallowMapSlot = job;
+    } else if (taskType == TaskType.REDUCE){
+      if (jobForFallowReduceSlot != null && 
+          !jobForFallowReduceSlot.getJobID().equals(jobId)) {
+        throw new RuntimeException(trackerName + " already has " + 
+                                   "slots reserved for " + 
+                                   jobForFallowReduceSlot + "; being"  +
+                                   " asked to reserve " + numSlots + " for " + 
+                                   jobId);
+      }
+
+      jobForFallowReduceSlot = job;
+    }
+    
+    job.reserveTaskTracker(this, taskType, numSlots);
+    LOG.info(trackerName + ": Reserved " + numSlots + " " + taskType + 
+             " slots for " + jobId);
+  }
+  
+  /**
+   * Free map slots on this <code>TaskTracker</code> which were reserved for 
+   * <code>taskType</code>.
+   * @param taskType {@link TaskType} of the task
+   * @param job job whose slots are being un-reserved
+   */
+  public void unreserveSlots(TaskType taskType, JobInProgress job) {
+    JobID jobId = job.getJobID();
+    if (taskType == TaskType.MAP) {
+      if (jobForFallowMapSlot == null || 
+          !jobForFallowMapSlot.getJobID().equals(jobId)) {
+        throw new RuntimeException(trackerName + " already has " + 
+                                   "slots reserved for " + 
+                                   jobForFallowMapSlot + "; being"  +
+                                   " asked to un-reserve for " + jobId);
+      }
+
+      jobForFallowMapSlot = null;
+    } else {
+      if (jobForFallowReduceSlot == null || 
+          !jobForFallowReduceSlot.getJobID().equals(jobId)) {
+        throw new RuntimeException(trackerName + " already has " + 
+                                   "slots reserved for " + 
+                                   jobForFallowReduceSlot + "; being"  +
+                                   " asked to un-reserve for " + jobId);
+      }
+      
+      jobForFallowReduceSlot = null;
+    }
+    
+    job.unreserveTaskTracker(this, taskType);
+    LOG.info(trackerName + ": Unreserved " + taskType + " slots for " + jobId);
+  }
+  
+  /**
+   * Cleanup when the {@link TaskTracker} is declared as 'lost' by the 
+   * JobTracker.
+   */
+  public void lost() {
+    // Inform jobs which have reserved slots on this tasktracker
+    if (jobForFallowMapSlot != null) {
+      unreserveSlots(TaskType.MAP, jobForFallowMapSlot);
+    }
+    if (jobForFallowReduceSlot != null) {
+      unreserveSlots(TaskType.REDUCE, jobForFallowReduceSlot);
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java.orig
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java.orig?rev=788036&view=auto
==============================================================================
    (empty)

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Wed Jun 24 14:22:13 2009
@@ -696,7 +696,7 @@
             ts.getFinishTime() == Long.parseLong(attempt.get(Keys.FINISH_TIME)));
 
 
-        TaskTrackerStatus ttStatus = jt.getTaskTracker(ts.getTaskTracker());
+        TaskTrackerStatus ttStatus = jt.getTaskTrackerStatus(ts.getTaskTracker());
 
         if (ttStatus != null) {
           assertTrue("http port of task attempt " + idStr + " obtained from " +

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java?rev=788036&r1=788035&r2=788036&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java Wed Jun 24 14:22:13 2009
@@ -35,6 +35,7 @@
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 import junit.framework.TestCase;
 
@@ -66,7 +67,7 @@
   public static class TestTaskScheduler extends LimitTasksPerJobTaskScheduler {
 
     @Override
-    public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
+    public synchronized List<Task> assignTasks(TaskTracker taskTracker)
         throws IOException {
       Collection<JobInProgress> jips = jobQueueJobInProgressListener
           .getJobQueue();



Mime
View raw message