hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r785794 [2/3] - in /hadoop/core/branches/HADOOP-4687: core/ core/src/java/ core/src/java/org/apache/hadoop/fs/ core/src/java/org/apache/hadoop/util/ core/src/test/core/ core/src/test/core/org/apache/hadoop/util/ hdfs/ hdfs/src/java/ hdfs/sr...
Date Wed, 17 Jun 2009 20:55:54 GMT
Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=785794&r1=785793&r2=785794&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Jun 17 20:55:51 2009
@@ -21,6 +21,10 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -103,7 +107,7 @@
   private volatile boolean jobFailed = false;
 
   JobPriority priority = JobPriority.NORMAL;
-  final JobTracker jobtracker;
+  protected JobTracker jobtracker;
 
   // NetworkTopology Node to the set of TIPs
   Map<Node, List<TaskInProgress>> nonRunningMapCache;
@@ -129,14 +133,14 @@
   // A list of cleanup tasks for the reduce task attempts, to be launched
   List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
 
-  private final int maxLevel;
+  private int maxLevel;
 
   /**
    * A special value indicating that 
    * {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should
    * schedule any available map tasks for this job, including speculative tasks.
    */
-  private final int anyCacheLevel;
+  private int anyCacheLevel;
   
   /**
    * A special value indicating that 
@@ -189,9 +193,13 @@
   
   private MetricsRecord jobMetrics;
   
-  // Maximum no. of fetch-failure notifications after which
-  // the map task is killed
+  // Maximum no. of fetch-failure notifications after which map task is killed
   private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
+
+  // Don't lower speculativeCap below one TT's worth (for small clusters)
+  private static final int MIN_SPEC_CAP = 10;
+  
+  private static final float MIN_SLOTS_CAP = 0.01f;
   
   // Map of mapTaskId -> no. of fetch failures
   private Map<TaskAttemptID, Integer> mapTaskIdToFetchFailuresMap =
@@ -199,19 +207,65 @@
 
   private Object schedulingInfo;
 
-  
+  //thresholds for speculative execution
+  private float slowTaskThreshold;
+  private float speculativeCap;
+  private float slowNodeThreshold; //standard deviations
+
+  //Statistics are maintained for a couple of things
+  //mapTaskStats is used for maintaining statistics about
+  //the completion time of map tasks on the trackers. On a per
+  //tracker basis, the mean time for task completion is maintained
+  private DataStatistics mapTaskStats = new DataStatistics();
+  //reduceTaskStats is used for maintaining statistics about
+  //the completion time of reduce tasks on the trackers. On a per
+  //tracker basis, the mean time for task completion is maintained
+  private DataStatistics reduceTaskStats = new DataStatistics();
+  //trackerMapStats used to maintain a mapping from the tracker to the
+  //the statistics about completion time of map tasks
+  private Map<String,DataStatistics> trackerMapStats = 
+    new HashMap<String,DataStatistics>();
+  //trackerReduceStats used to maintain a mapping from the tracker to the
+  //the statistics about completion time of reduce tasks
+  private Map<String,DataStatistics> trackerReduceStats = 
+    new HashMap<String,DataStatistics>();
+  //runningMapStats used to maintain the RUNNING map tasks' statistics 
+  private DataStatistics runningMapTaskStats = new DataStatistics();
+  //runningReduceStats used to maintain the RUNNING reduce tasks' statistics
+  private DataStatistics runningReduceTaskStats = new DataStatistics();
+ 
   /**
    * Create an almost empty JobInProgress, which can be used only for tests
    */
-  protected JobInProgress(JobID jobid, JobConf conf) {
+  protected JobInProgress(JobID jobid, JobConf conf, JobTracker tracker) {
     this.conf = conf;
     this.jobId = jobid;
     this.numMapTasks = conf.getNumMapTasks();
     this.numReduceTasks = conf.getNumReduceTasks();
     this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL;
     this.anyCacheLevel = this.maxLevel+1;
-    this.jobtracker = null;
+    this.jobtracker = tracker;
     this.restartCount = 0;
+    
+    hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+    hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+    this.nonLocalMaps = new LinkedList<TaskInProgress>();
+    this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+    this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+    this.nonRunningReduces = new LinkedList<TaskInProgress>();    
+    this.runningReduces = new LinkedHashSet<TaskInProgress>();
+    this.resourceEstimator = new ResourceEstimator(this);
+    this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
+    this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
+    (numMapTasks + numReduceTasks + 10);
+    
+    this.slowTaskThreshold = Math.max(0.0f,
+        conf.getFloat("mapred.speculative.execution.slowTaskThreshold",1.0f));
+    this.speculativeCap = conf.getFloat(
+        "mapred.speculative.execution.speculativeCap",0.1f);
+    this.slowNodeThreshold = conf.getFloat(
+        "mapred.speculative.execution.slowNodeThreshold",1.0f);
+
   }
   
   /**
@@ -285,6 +339,19 @@
     this.nonRunningReduces = new LinkedList<TaskInProgress>();    
     this.runningReduces = new LinkedHashSet<TaskInProgress>();
     this.resourceEstimator = new ResourceEstimator(this);
+    
+    this.nonLocalMaps = new LinkedList<TaskInProgress>();
+    this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+    this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+    this.nonRunningReduces = new LinkedList<TaskInProgress>();    
+    this.runningReduces = new LinkedHashSet<TaskInProgress>();
+    this.slowTaskThreshold = Math.max(0.0f,
+        conf.getFloat("mapred.speculative.execution.slowTaskThreshold",1.0f));
+    this.speculativeCap = conf.getFloat(
+        "mapred.speculative.execution.speculativeCap",0.1f);
+    this.slowNodeThreshold = conf.getFloat(
+        "mapred.speculative.execution.slowNodeThreshold",1.0f);
+
   }
 
   /**
@@ -443,7 +510,7 @@
     }
         
     // set the launch time
-    this.launchTime = System.currentTimeMillis();
+    this.launchTime = jobtracker.getClock().getTime();
 
     //
     // Create reduce tasks
@@ -939,9 +1006,9 @@
       LOG.info("Cannot create task split for " + profile.getJobID());
       return null;
     }
-        
-    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, anyCacheLevel,
-                                status.mapProgress());
+       
+    int target = findNewMapTask(tts, clusterSize, numUniqueHosts,
+        anyCacheLevel);
     if (target == -1) {
       return null;
     }
@@ -1001,8 +1068,7 @@
       return null;
     }
 
-    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel, 
-                                status.mapProgress());
+    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel);
     if (target == -1) {
       return null;
     }
@@ -1025,7 +1091,7 @@
     }
 
     int target = findNewMapTask(tts, clusterSize, numUniqueHosts, 
-                                NON_LOCAL_CACHE_LEVEL, status.mapProgress());
+                                NON_LOCAL_CACHE_LEVEL);
     if (target == -1) {
       return null;
     }
@@ -1203,8 +1269,7 @@
       return null;
     }
 
-    int  target = findNewReduceTask(tts, clusterSize, numUniqueHosts, 
-                                    status.reduceProgress());
+    int  target = findNewReduceTask(tts, clusterSize, numUniqueHosts);
     if (target == -1) {
       return null;
     }
@@ -1267,15 +1332,21 @@
       name = Values.MAP.name();
       counter = JobCounter.TOTAL_LAUNCHED_MAPS;
       splits = tip.getSplitNodes();
-      if (tip.getActiveTasks().size() > 1)
+      if (tip.isSpeculating()) {
         speculativeMapTasks++;
+        LOG.debug("Chosen speculative task, current speculativeMap task count: "
+            + speculativeMapTasks);
+      }
       metrics.launchMap(id);
     } else {
       ++runningReduceTasks;
       name = Values.REDUCE.name();
       counter = JobCounter.TOTAL_LAUNCHED_REDUCES;
-      if (tip.getActiveTasks().size() > 1)
+      if (tip.isSpeculating()) {
         speculativeReduceTasks++;
+        LOG.debug("Chosen speculative task, current speculativeReduce task count: "
+          + speculativeReduceTasks);
+      }
       metrics.launchReduce(id);
     }
     // Note that the logs are for the scheduled tasks only. Tasks that join on 
@@ -1433,7 +1504,7 @@
     String[] splitLocations = tip.getSplitLocations();
 
     // Remove the TIP from the list for running non-local maps
-    if (splitLocations.length == 0) {
+    if (splitLocations == null || splitLocations.length == 0) {
       nonLocalRunningMaps.remove(tip);
       return;
     }
@@ -1473,8 +1544,9 @@
    * Adds a map tip to the list of running maps.
    * @param tip the tip that needs to be scheduled as running
    */
-  private synchronized void scheduleMap(TaskInProgress tip) {
+  protected synchronized void scheduleMap(TaskInProgress tip) {
     
+    runningMapTaskStats.add(0.0f);
     if (runningMapCache == null) {
       LOG.warn("Running cache for maps is missing!! " 
                + "Job details are missing.");
@@ -1483,7 +1555,7 @@
     String[] splitLocations = tip.getSplitLocations();
 
     // Add the TIP to the list of non-local running TIPs
-    if (splitLocations.length == 0) {
+    if (splitLocations == null || splitLocations.length == 0) {
       nonLocalRunningMaps.add(tip);
       return;
     }
@@ -1508,7 +1580,8 @@
    * Adds a reduce tip to the list of running reduces
    * @param tip the tip that needs to be scheduled as running
    */
-  private synchronized void scheduleReduce(TaskInProgress tip) {
+  protected synchronized void scheduleReduce(TaskInProgress tip) {
+    runningReduceTaskStats.add(0.0f);
     if (runningReduces == null) {
       LOG.warn("Running cache for reducers missing!! "
                + "Job details are missing.");
@@ -1612,57 +1685,71 @@
     return null;
   }
   
+  public boolean hasSpeculativeMaps() {
+    return hasSpeculativeMaps;
+  }
+
+  public boolean hasSpeculativeReduces() {
+    return hasSpeculativeReduces;
+  }
+
   /**
-   * Find a speculative task
-   * @param list a list of tips
-   * @param taskTracker the tracker that has requested a tip
-   * @param avgProgress the average progress for speculation
-   * @param currentTime current time in milliseconds
-   * @param shouldRemove whether to remove the tips
-   * @return a tip that can be speculated on the tracker
-   */
-  private synchronized TaskInProgress findSpeculativeTask(
-      Collection<TaskInProgress> list, TaskTrackerStatus ttStatus,
-      double avgProgress, long currentTime, boolean shouldRemove) {
+   * Retrieve a task for speculation.
+   * If a task slot becomes available and there are less than SpeculativeCap
+   * speculative tasks running: 
+   *  1)Ignore the request if the TT's progressRate is < SlowNodeThreshold
+   *  2)Choose candidate tasks - those tasks whose progress rate is below
+   *    slowTaskThreshold * mean(progress-rates)
+   *  3)Speculate task that's expected to complete last
+   * @param list pool of tasks to choose from
+   * @param taskTrackerName the name of the TaskTracker asking for a task
+   * @param taskTrackerHost the hostname of the TaskTracker asking for a task
+   * @return the TIP to speculatively re-execute
+   */
+  protected synchronized TaskInProgress findSpeculativeTask(
+      Collection<TaskInProgress> list, String taskTrackerName, 
+      String taskTrackerHost) {
+    if (list.isEmpty()) {
+      return null;
+    }
+    long now = jobtracker.getClock().getTime();
+    if (isSlowTracker(taskTrackerName) || atSpeculativeCap(list)) {
+      return null;
+    }
+    // List of speculatable candidates, start with all, and chop it down
+    ArrayList<TaskInProgress> candidates = new ArrayList<TaskInProgress>(list);
     
-    Iterator<TaskInProgress> iter = list.iterator();
-
+    Iterator<TaskInProgress> iter = candidates.iterator();
     while (iter.hasNext()) {
       TaskInProgress tip = iter.next();
-      // should never be true! (since we delete completed/failed tasks)
-      if (!tip.isRunning()) {
-        iter.remove();
-        continue;
-      }
-
-      if (!tip.hasRunOnMachine(ttStatus.getHost(), 
-                               ttStatus.getTrackerName())) {
-        if (tip.hasSpeculativeTask(currentTime, avgProgress)) {
-          // In case of shared list we don't remove it. Since the TIP failed 
-          // on this tracker can be scheduled on some other tracker.
-          if (shouldRemove) {
-            iter.remove(); //this tracker is never going to run it again
-          }
-          return tip;
-        } 
-      } else {
-        // Check if this tip can be removed from the list.
-        // If the list is shared then we should not remove.
-        if (shouldRemove) {
-          // This tracker will never speculate this tip
+      if (tip.hasRunOnMachine(taskTrackerHost, taskTrackerName) ||
+          !tip.canBeSpeculated(now)) {
+          //remove it from candidates
           iter.remove();
-        }
       }
     }
-    return null;
+    //resort according to expected time till completion
+    Comparator<TaskInProgress> LateComparator = 
+      new EstimatedTimeLeftComparator(now);
+    Collections.sort(candidates, LateComparator);
+    if (candidates.size() > 0 ) {
+      TaskInProgress tip = candidates.get(0);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Chose task " + tip.getTIPId() + ". Statistics: Task's : " +
+            tip.getCurrentProgressRate(now) + " Job's : " + 
+            (tip.isMapTask() ? runningMapTaskStats : runningReduceTaskStats));
+      }
+      return tip;
+    } else {
+      return null;
+    }
   }
-  
+
   /**
    * Find new map task
    * @param tts The task tracker that is asking for a task
    * @param clusterSize The number of task trackers in the cluster
    * @param numUniqueHosts The number of hosts that run task trackers
-   * @param avgProgress The average progress of this kind of task in this job
    * @param maxCacheLevel The maximum topology level until which to schedule
    *                      maps. 
    *                      A value of {@link #anyCacheLevel} implies any 
@@ -1675,14 +1762,14 @@
   private synchronized int findNewMapTask(final TaskTrackerStatus tts, 
                                           final int clusterSize,
                                           final int numUniqueHosts,
-                                          final int maxCacheLevel,
-                                          final double avgProgress) {
+                                          final int maxCacheLevel) {
+    String taskTrackerName = tts.getTrackerName();
+    String taskTrackerHost = tts.getHost();
     if (numMapTasks == 0) {
       LOG.info("No maps to schedule for " + profile.getJobID());
       return -1;
     }
 
-    String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
     
     //
@@ -1694,7 +1781,7 @@
       return -1;
     }
     
-    if (!shouldRunOnTaskTracker(taskTracker)) {
+    if (!shouldRunOnTaskTracker(taskTrackerName)) {
       return -1;
     }
 
@@ -1821,82 +1908,61 @@
     // 
  
     if (hasSpeculativeMaps) {
-      long currentTime = System.currentTimeMillis();
-
-      // 1. Check bottom up for speculative tasks from the running cache
-      if (node != null) {
-        Node key = node;
-        for (int level = 0; level < maxLevel; ++level) {
-          Set<TaskInProgress> cacheForLevel = runningMapCache.get(key);
-          if (cacheForLevel != null) {
-            tip = findSpeculativeTask(cacheForLevel, tts, 
-                                      avgProgress, currentTime, level == 0);
-            if (tip != null) {
-              if (cacheForLevel.size() == 0) {
-                runningMapCache.remove(key);
-              }
-              return tip.getIdWithinJob();
-            }
-          }
-          key = key.getParent();
-        }
+      tip = getSpeculativeMap(taskTrackerName, taskTrackerHost);
+      if (tip != null) {
+        return tip.getIdWithinJob();
       }
+    }
+   return -1;
+  }
 
-      // 2. Check breadth-wise for speculative tasks
-      
-      for (Node parent : nodesAtMaxLevel) {
-        // ignore the parent which is already scanned
-        if (parent == nodeParentAtMaxLevel) {
-          continue;
-        }
+  private synchronized TaskInProgress getSpeculativeMap(String taskTrackerName, 
+      String taskTrackerHost) {
 
-        Set<TaskInProgress> cache = runningMapCache.get(parent);
-        if (cache != null) {
-          tip = findSpeculativeTask(cache, tts, avgProgress, 
-                                    currentTime, false);
-          if (tip != null) {
-            // remove empty cache entries
-            if (cache.size() == 0) {
-              runningMapCache.remove(parent);
-            }
-            LOG.info("Choosing a non-local task " + tip.getTIPId() 
-                     + " for speculation");
-            return tip.getIdWithinJob();
-          }
-        }
-      }
-
-      // 3. Check non-local tips for speculation
-      tip = findSpeculativeTask(nonLocalRunningMaps, tts, avgProgress, 
-                                currentTime, false);
-      if (tip != null) {
-        LOG.info("Choosing a non-local task " + tip.getTIPId() 
-                 + " for speculation");
-        return tip.getIdWithinJob();
+    //////// Populate allTips with all TaskInProgress
+    Set<TaskInProgress> allTips = new HashSet<TaskInProgress>();
+    
+    // collection of node at max level in the cache structure
+    Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
+    // Add all tasks from max-level nodes breadth-wise
+    for (Node parent : nodesAtMaxLevel) {
+      Set<TaskInProgress> cache = runningMapCache.get(parent);
+      if (cache != null) {
+        allTips.addAll(cache);
       }
     }
+    // Add all non-local TIPs
+    allTips.addAll(nonLocalRunningMaps);
     
-    return -1;
+    ///////// Select a TIP to run on
+    TaskInProgress tip = findSpeculativeTask(allTips, taskTrackerName, 
+        taskTrackerHost);
+    
+    if (tip != null) {
+      LOG.info("Choosing map task " + tip.getTIPId() + 
+          " for speculative execution");
+    } else {
+      LOG.debug("No speculative map task found for tracker " + taskTrackerName);
+    }
+    return tip;
   }
-
+  
   /**
    * Find new reduce task
    * @param tts The task tracker that is asking for a task
    * @param clusterSize The number of task trackers in the cluster
    * @param numUniqueHosts The number of hosts that run task trackers
-   * @param avgProgress The average progress of this kind of task in this job
    * @return the index in tasks of the selected task (or -1 for no task)
    */
   private synchronized int findNewReduceTask(TaskTrackerStatus tts, 
                                              int clusterSize,
-                                             int numUniqueHosts,
-                                             double avgProgress) {
+                                             int numUniqueHosts) {
+    String taskTrackerName = tts.getTrackerName();
+    String taskTrackerHost = tts.getHost();
     if (numReduceTasks == 0) {
       LOG.info("No reduces to schedule for " + profile.getJobID());
       return -1;
     }
-
-    String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
 
     // Update the last-known clusterSize
@@ -1906,14 +1972,14 @@
       return -1;
     }
 
-    if (!shouldRunOnTaskTracker(taskTracker)) {
+    if (!shouldRunOnTaskTracker(taskTrackerName)) {
       return -1;
     }
 
     long outSize = resourceEstimator.getEstimatedReduceInputSize();
     long availSpace = tts.getResourceStatus().getAvailableSpace();
     if(availSpace < outSize) {
-      LOG.warn("No room for reduce task. Node " + taskTracker + " has " +
+      LOG.warn("No room for reduce task. Node " + taskTrackerName + " has " +
                 availSpace + 
                " bytes free; but we expect reduce input to take " + outSize);
 
@@ -1930,16 +1996,187 @@
 
     // 2. check for a reduce tip to be speculated
     if (hasSpeculativeReduces) {
-      tip = findSpeculativeTask(runningReduces, tts, avgProgress, 
-                                System.currentTimeMillis(), false);
+      tip = getSpeculativeReduce(taskTrackerName, taskTrackerHost);
       if (tip != null) {
-        scheduleReduce(tip);
         return tip.getIdWithinJob();
       }
     }
 
     return -1;
   }
+
+  private synchronized TaskInProgress getSpeculativeReduce(
+      String taskTrackerName, String taskTrackerHost) {
+    TaskInProgress tip = findSpeculativeTask(
+        runningReduces, taskTrackerName, taskTrackerHost);
+    if (tip != null) {
+      LOG.info("Choosing reduce task " + tip.getTIPId() + 
+          " for speculative execution");
+    }else {
+      LOG.debug("No speculative map task found for tracker " + taskTrackerHost);
+    }
+    return tip;
+  }
+
+    /**
+     * Check to see if the maximum number of speculative tasks are
+     * already being executed currently.
+     * @param tasks the set of tasks to test
+     * @return has the cap been reached?
+     */
+   private boolean atSpeculativeCap(Collection<TaskInProgress> tasks) {
+     float numTasks = tasks.size();
+     if (numTasks == 0){
+       return true; // avoid divide by zero
+     }
+
+     //return true if totalSpecTask < max(10, 0.01 * total-slots, 
+     //                                   0.1 * total-running-tasks)
+
+     if (speculativeMapTasks + speculativeReduceTasks < MIN_SPEC_CAP) {
+       return false; // at least one slow tracker's worth of slots(default=10)
+     }
+     ClusterStatus c = jobtracker.getClusterStatus(false); 
+     int numSlots = c.getMaxMapTasks() + c.getMaxReduceTasks();
+     if ((float)(speculativeMapTasks + speculativeReduceTasks) < 
+       numSlots * MIN_SLOTS_CAP) {
+       return false;
+     }
+     boolean atCap = (((float)(speculativeMapTasks+
+         speculativeReduceTasks)/numTasks) >= speculativeCap);
+     if (LOG.isDebugEnabled()) {
+       LOG.debug("SpeculativeCap is "+speculativeCap+", specTasks/numTasks is " +
+           ((float)(speculativeMapTasks+speculativeReduceTasks)/numTasks)+
+           ", so atSpecCap() is returning "+atCap);
+     }
+     return atCap;
+   }
+  
+  /**
+   * A class for comparing the estimated time to completion of two tasks
+   */
+  private static class EstimatedTimeLeftComparator 
+  implements Comparator<TaskInProgress> {
+    private long time;
+    public EstimatedTimeLeftComparator(long now) {
+      this.time = now;
+    }
+    /**
+     * Estimated time to completion is measured as:
+     *   % of task left to complete (1 - progress) / progress rate of the task.
+     * 
+     * This assumes that tasks are linear in their progress, which is 
+     * often wrong, especially since progress for reducers is currently
+     * calculated by evenly weighting their three stages (shuffle, sort, map)
+     * which rarely account for 1/3 each. This should be fixed in the future
+     * by calculating progressRate more intelligently or splitting these
+     * multi-phase tasks into individual tasks.
+     * 
+     * The ordering this comparator defines is: task1 < task2 if task1 is
+     * estimated to finish farther in the future => compare(t1,t2) returns -1
+     */
+    public int compare(TaskInProgress tip1, TaskInProgress tip2) {
+      //we have to use the Math.max in the denominator to avoid divide by zero
+      //error because prog and progRate can both be zero (if one is zero,
+      //the other one will be 0 too).
+      //We use inverse of time_reminaing=[(1- prog) / progRate]
+      //so that (1-prog) is in denom. because tasks can have arbitrarily 
+      //low progRates in practice (e.g. a task that is half done after 1000
+      //seconds will have progRate of 0.0000005) so we would rather 
+      //use Math.maxnon (1-prog) by putting it in the denominator 
+      //which will cause tasks with prog=1 look 99.99% done instead of 100%
+      //which is okay
+      double t1 = tip1.getCurrentProgressRate(time) / Math.max(0.0001, 
+          1.0 - tip1.getProgress());
+      double t2 = tip2.getCurrentProgressRate(time) / Math.max(0.0001, 
+          1.0 - tip2.getProgress());
+      if (t1 < t2) return -1;
+      else if (t2 < t1) return 1;
+      else return 0;
+    }
+  }
+  
+  /**
+   * Compares the ave progressRate of tasks that have finished on this 
+   * taskTracker to the ave of all succesfull tasks thus far to see if this 
+   * TT one is too slow for speculating.
+   * slowNodeThreshold is used to determine the number of standard deviations
+   * @param taskTracker the name of the TaskTracker we are checking
+   * @return is this TaskTracker slow
+   */
+  protected boolean isSlowTracker(String taskTracker) {
+    if (trackerMapStats.get(taskTracker) != null &&
+        trackerMapStats.get(taskTracker).mean() -
+        mapTaskStats.mean() > mapTaskStats.std()*slowNodeThreshold) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Tracker " + taskTracker + 
+            " declared slow. trackerMapStats.get(taskTracker).mean() :" + trackerMapStats.get(taskTracker).mean() +
+            " mapTaskStats :" + mapTaskStats);
+      }
+      return true;
+    }
+    if (trackerReduceStats.get(taskTracker) != null && 
+        trackerReduceStats.get(taskTracker).mean() -
+        reduceTaskStats.mean() > reduceTaskStats.std()*slowNodeThreshold) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Tracker " + taskTracker + 
+            " declared slow. trackerReduceStats.get(taskTracker).mean() :" + trackerReduceStats.get(taskTracker).mean() +
+            " reduceTaskStats :" + reduceTaskStats);
+      }
+      return true;
+    }
+    return false;
+  }
+  
+  static class DataStatistics{
+    private int count = 0;
+    private double sum = 0;
+    private double sumSquares = 0;
+    
+    public DataStatistics() {
+    }
+    
+    public DataStatistics(double initNum) {
+      this.count = 1;
+      this.sum = initNum;
+      this.sumSquares = initNum * initNum;
+    }
+    
+    public void add(double newNum) {
+      this.count++;
+      this.sum += newNum;
+      this.sumSquares += newNum * newNum;
+    }
+
+    public void updateStatistics(double old, double update) {
+      sub(old);
+      add(update);
+    }
+    private void sub(double oldNum) {
+      this.count--;
+      this.sum -= oldNum;
+      this.sumSquares -= oldNum * oldNum;
+    }
+    
+    public double mean() {
+      return sum/count;      
+    }
+  
+    public double var() {
+      // E(X^2) - E(X)^2
+      return (sumSquares/count) - mean() * mean();
+    }
+    
+    public double std() {
+      return Math.sqrt(this.var());
+    }
+    
+    public String toString() {
+      return "DataStatistics: count is " + count + ", sum is " + sum + 
+      ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std();
+    }
+    
+  }
   
   private boolean shouldRunOnTaskTracker(String taskTracker) {
     //
@@ -2002,7 +2239,6 @@
                                             TaskStatus status)
   {
     TaskAttemptID taskid = status.getTaskID();
-    int oldNumAttempts = tip.getActiveTasks().size();
     final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
         
     // Sanity check: is the TIP already complete? 
@@ -2018,10 +2254,9 @@
       }
       return false;
     } 
-
+    boolean wasSpeculating = tip.isSpeculating(); //store this fact
     LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() + 
              " successfully.");          
-
     // Mark the TIP as complete
     tip.completed(taskid);
     resourceEstimator.updateWithCompletedTask(status, tip);
@@ -2059,7 +2294,6 @@
                                 tip.getExecFinishTime(),
                                 status.getCounters()); 
         
-    int newNumAttempts = tip.getActiveTasks().size();
     if (tip.isJobSetupTask()) {
       // setup task has finished. kill the extra setup tip
       killSetupTip(!tip.isMapTask());
@@ -2096,12 +2330,11 @@
       jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
     } else if (tip.isMapTask()) {
       runningMapTasks -= 1;
-      // check if this was a sepculative task
-      if (oldNumAttempts > 1) {
-        speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
-      }
       finishedMapTasks += 1;
       metrics.completeMap(taskid);
+      if (hasSpeculativeMaps) {
+        updateTaskTrackerStats(tip,ttStatus,trackerMapStats,mapTaskStats);
+      }
       // remove the completed map from the resp running caches
       retireMap(tip);
       if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
@@ -2109,21 +2342,66 @@
       }
     } else {
       runningReduceTasks -= 1;
-      if (oldNumAttempts > 1) {
-        speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
-      }
       finishedReduceTasks += 1;
       metrics.completeReduce(taskid);
+      if (hasSpeculativeReduces) {
+        updateTaskTrackerStats(tip,ttStatus,trackerReduceStats,reduceTaskStats);
+      }
       // remove the completed reduces from the running reducers set
       retireReduce(tip);
       if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
         this.status.setReduceProgress(1.0f);
       }
     }
-    
+    decrementSpeculativeCount(wasSpeculating, tip);
     return true;
   }
-
+  
+  private void updateTaskTrackerStats(TaskInProgress tip, TaskTrackerStatus ttStatus, 
+      Map<String,DataStatistics> trackerStats, DataStatistics overallStats) {
+    float tipDuration = tip.getExecFinishTime()-tip.getDispatchTime();
+    DataStatistics ttStats = 
+      trackerStats.get(ttStatus.getTrackerName());
+    double oldMean = 0.0d;
+    //We maintain the mean of TaskTrackers' means. That way, we get a single
+    //data-point for every tracker (used in the evaluation in isSlowTracker)
+    if (ttStats != null) {
+      oldMean = ttStats.mean();
+      ttStats.add(tipDuration);
+      overallStats.updateStatistics(oldMean, ttStats.mean());
+    } else {
+      trackerStats.put(ttStatus.getTrackerName(),
+          (ttStats = new DataStatistics(tipDuration)));
+      overallStats.add(tipDuration);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Added mean of " +ttStats.mean() + " to trackerStats of type "+
+          (tip.isMapTask() ? "Map" : "Reduce") +
+          " on "+ttStatus.getTrackerName()+". DataStatistics is now: " +
+          trackerStats.get(ttStatus.getTrackerName()));
+    }
+  }
+  
+  public void updateStatistics(double oldProg, double newProg, boolean isMap) {
+    if (isMap) {   
+      runningMapTaskStats.updateStatistics(oldProg, newProg);
+    } else {
+      runningReduceTaskStats.updateStatistics(oldProg, newProg);
+    }
+  }
+  
+  public DataStatistics getRunningTaskStatistics(boolean isMap) {
+    if (isMap) {
+      return runningMapTaskStats;
+    } else {
+      return runningReduceTaskStats;
+    }
+  }
+  
+  public float getSlowTaskThreshold() {
+    return slowTaskThreshold;
+  }
+  
   /**
    * The job is done since all it's component tasks are either
    * successful or have failed.
@@ -2142,7 +2420,7 @@
       if (reduces.length == 0) {
         this.status.setReduceProgress(1.0f);
       }
-      this.finishTime = System.currentTimeMillis();
+      this.finishTime = jobtracker.getClock().getTime();
       LOG.info("Job " + this.status.getJobID() + 
                " has completed successfully.");
       JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime, 
@@ -2164,7 +2442,7 @@
         this.status = new JobStatus(status.getJobID(),
                                     1.0f, 1.0f, 1.0f, JobStatus.FAILED,
                                     status.getJobPriority());
-        this.finishTime = System.currentTimeMillis();
+        this.finishTime = jobtracker.getClock().getTime();
         JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime, 
                                      this.finishedMapTasks, 
                                      this.finishedReduceTasks);
@@ -2172,7 +2450,7 @@
         this.status = new JobStatus(status.getJobID(),
                                     1.0f, 1.0f, 1.0f, JobStatus.KILLED,
                                     status.getJobPriority());
-        this.finishTime = System.currentTimeMillis();
+        this.finishTime = jobtracker.getClock().getTime();
         JobHistory.JobInfo.logKilled(this.status.getJobID(), finishTime, 
                                      this.finishedMapTasks, 
                                      this.finishedReduceTasks);
@@ -2273,6 +2551,21 @@
     terminate(JobStatus.FAILED);
   }
   
+  private void decrementSpeculativeCount(boolean wasSpeculating, 
+      TaskInProgress tip) {
+    if (wasSpeculating) {
+      if (tip.isMapTask()) {
+        speculativeMapTasks--;
+        LOG.debug("Decrement count. Current speculativeMap task count: " +
+            speculativeMapTasks);
+      } else {
+        speculativeReduceTasks--;
+        LOG.debug("Decremented count. Current speculativeReduce task count: " + 
+            speculativeReduceTasks);
+      }
+    }
+  }
+  
   /**
    * A task assigned to this JobInProgress has reported in as failed.
    * Most of the time, we'll just reschedule execution.  However, after
@@ -2292,9 +2585,11 @@
     final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
     // check if the TIP is already failed
     boolean wasFailed = tip.isFailed();
+    boolean wasSpeculating = tip.isSpeculating();
 
     // Mark the taskid as FAILED or KILLED
     tip.incompleteSubTask(taskid, this.status);
+    decrementSpeculativeCount(wasSpeculating, tip);
    
     boolean isRunning = tip.isRunning();
     boolean isComplete = tip.isComplete();
@@ -2477,8 +2772,8 @@
    * @param reason The reason that the task failed
    * @param trackerName The task tracker the task failed on
    */
-  public void failedTask(TaskInProgress tip, TaskAttemptID taskid, String reason, 
-                         TaskStatus.Phase phase, TaskStatus.State state, 
+  public synchronized void failedTask(TaskInProgress tip, TaskAttemptID taskid,
+      String reason, TaskStatus.Phase phase, TaskStatus.State state, 
                          String trackerName) {
     TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), 
                                                     taskid,
@@ -2491,10 +2786,10 @@
     // update the actual start-time of the attempt
     TaskStatus oldStatus = tip.getTaskStatus(taskid); 
     long startTime = oldStatus == null
-                     ? System.currentTimeMillis()
+                     ? jobtracker.getClock().getTime()
                      : oldStatus.getStartTime();
     status.setStartTime(startTime);
-    status.setFinishTime(System.currentTimeMillis());
+    status.setFinishTime(jobtracker.getClock().getTime());
     boolean wasComplete = tip.isComplete();
     updateTaskStatus(tip, status);
     boolean isComplete = tip.isComplete();

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobQueueClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobQueueClient.java?rev=785794&r1=785793&r2=785794&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobQueueClient.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobQueueClient.java Wed Jun 17 20:55:51 2009
@@ -112,12 +112,11 @@
    */
 
   private void displayQueueInfo(String queue, boolean showJobs) throws IOException {
-    JobQueueInfo schedInfo = jc.getQueueInfo(queue);
-    if (schedInfo == null) {
+    JobQueueInfo jobQueueInfo = jc.getQueueInfo(queue);
+    if (jobQueueInfo == null) {
       System.out.printf("Queue Name : %s has no scheduling information \n", queue);
     } else {
-      System.out.printf("Queue Name : %s \n", schedInfo.getQueueName());
-      System.out.printf("Scheduling Info : %s \n",schedInfo.getSchedulingInfo());
+      printJobQueueInfo(jobQueueInfo);
     }
     if (showJobs) {
       System.out.printf("Job List\n");
@@ -128,6 +127,13 @@
     }
   }
 
+  // format and print information about the passed in job queue.
+  private void printJobQueueInfo(JobQueueInfo jobQueueInfo) {
+    System.out.printf("Queue Name : %s \n", jobQueueInfo.getQueueName()); 
+    System.out.printf("Queue State : %s \n", jobQueueInfo.getQueueState());
+    System.out.printf("Scheduling Info : %s \n",jobQueueInfo.getSchedulingInfo());
+  }
+
   /**
    * Method used to display the list of the JobQueues registered
    * with the {@link QueueManager}
@@ -141,8 +147,7 @@
       if(schedInfo.trim().equals("")){
         schedInfo = "N/A";
       }
-      System.out.printf("Queue Name : %s \n", queue.getQueueName());
-      System.out.printf("Scheduling Info : %s \n",queue.getSchedulingInfo());
+      printJobQueueInfo(queue);
     }
   }
 

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobQueueInfo.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobQueueInfo.java?rev=785794&r1=785793&r2=785794&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobQueueInfo.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobQueueInfo.java Wed Jun 17 20:55:51 2009
@@ -37,6 +37,7 @@
   //Once the scheduling information is set there is no way to recover it.
   private String schedulingInfo; 
   
+  private String queueState;
   
   /**
    * Default constructor for Job Queue Info.
@@ -56,6 +57,8 @@
   public JobQueueInfo(String queueName, String schedulingInfo) {
     this.queueName = queueName;
     this.schedulingInfo = schedulingInfo;
+    // make it running by default.
+    this.queueState = Queue.QueueState.RUNNING.getStateName();
   }
   
   
@@ -100,15 +103,33 @@
     }
   }
   
+  /**
+   * Set the state of the queue
+   * @param state state of the queue.
+   */
+  public void setQueueState(String state) {
+    queueState = state;
+  }
+  
+  /**
+   * Return the queue state
+   * @return the queue state.
+   */
+  public String getQueueState() {
+    return queueState;
+  }
+  
   @Override
   public void readFields(DataInput in) throws IOException {
     queueName = Text.readString(in);
+    queueState = Text.readString(in);
     schedulingInfo = Text.readString(in);
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
     Text.writeString(out, queueName);
+    Text.writeString(out, queueState);
     if(schedulingInfo!= null) {
       Text.writeString(out, schedulingInfo);
     }else {

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=785794&r1=785793&r2=785794&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Wed Jun 17 20:55:51 2009
@@ -61,8 +61,10 @@
    * Version 21: Modified TaskID to be aware of the new TaskTypes                                 
    * Version 22: Added method getQueueAclsForCurrentUser to get queue acls info
    *             for a user
+   * Version 23: Modified the JobQueueInfo class to inlucde queue state.
+   *             Part of HADOOP-5913.            
    */
-  public static final long versionID = 22L;
+  public static final long versionID = 23L;
 
   /**
    * Allocate a name for the job.

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=785794&r1=785793&r2=785794&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Jun 17 20:55:51 2009
@@ -138,6 +138,8 @@
   // system files should have 700 permission
   final static FsPermission SYSTEM_FILE_PERMISSION =
     FsPermission.createImmutable((short) 0700); // rwx------
+  
+  private Clock clock;
 
   /**
    * A client tried to submit a job before the Job Tracker was ready.
@@ -165,6 +167,10 @@
 
   public static final Log LOG = LogFactory.getLog(JobTracker.class);
     
+  public Clock getClock() {
+    return clock;
+  }
+  
   /**
    * Start the JobTracker with given configuration.
    * 
@@ -181,7 +187,7 @@
     JobTracker result = null;
     while (true) {
       try {
-        result = new JobTracker(conf);
+        result = new JobTracker(conf, new Clock());
         result.taskScheduler.setTaskTrackerManager(result);
         break;
       } catch (VersionMismatch e) {
@@ -242,7 +248,7 @@
         try {
           // Every 3 minutes check for any tasks that are overdue
           Thread.sleep(tasktrackerExpiryInterval/3);
-          long now = System.currentTimeMillis();
+          long now = clock.getTime();
           LOG.debug("Starting launching task sweep");
           synchronized (JobTracker.this) {
             synchronized (launchingTasks) {
@@ -295,7 +301,7 @@
     public void addNewTask(TaskAttemptID taskName) {
       synchronized (launchingTasks) {
         launchingTasks.put(taskName, 
-                           System.currentTimeMillis());
+                           clock.getTime());
       }
     }
       
@@ -339,7 +345,7 @@
           synchronized (JobTracker.this) {
             synchronized (taskTrackers) {
               synchronized (trackerExpiryQueue) {
-                long now = System.currentTimeMillis();
+                long now = clock.getTime();
                 TaskTrackerStatus leastRecent = null;
                 while ((trackerExpiryQueue.size() > 0) &&
                        ((leastRecent = trackerExpiryQueue.first()) != null) &&
@@ -405,7 +411,7 @@
         try {
           Thread.sleep(retireJobCheckInterval);
           List<JobInProgress> retiredJobs = new ArrayList<JobInProgress>();
-          long now = System.currentTimeMillis();
+          long now = clock.getTime();
           long retireBefore = now - retireJobInterval;
 
           synchronized (jobs) {
@@ -465,9 +471,9 @@
     long lastUpdated;
     boolean blacklisted; 
 
-    FaultInfo() {
+    FaultInfo(long time) {
       numFaults = 0;
-      lastUpdated = System.currentTimeMillis();
+      lastUpdated = time;
       blacklisted = false;
     }
 
@@ -517,14 +523,15 @@
     void incrementFaults(String hostName) {
       synchronized (potentiallyFaultyTrackers) {
         FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
+        long now = clock.getTime();
         if (fi == null) {
-          fi = new FaultInfo();
+          fi = new FaultInfo(now);
           potentiallyFaultyTrackers.put(hostName, fi);
         }
         int numFaults = fi.getFaultCount();
         ++numFaults;
         fi.setFaultCount(numFaults);
-        fi.setLastUpdated(System.currentTimeMillis());
+        fi.setLastUpdated(now);
         if (!fi.isBlacklisted()) {
           if (shouldBlacklist(hostName, numFaults)) {
             LOG.info("Adding " + hostName + " to the blacklist" +
@@ -746,7 +753,7 @@
         
         TaskID id = TaskID.forName(taskId);
         TaskInProgress tip = getTip(id);
-        
+
         updateTip(tip, task);
       }
 
@@ -759,7 +766,7 @@
         
         // Check if the transaction for this attempt can be committed
         String taskStatus = attempt.get(Keys.TASK_STATUS);
-        
+
         if (taskStatus.length() > 0) {
           // This means this is an update event
           if (taskStatus.equals(Values.SUCCESS.name())) {
@@ -1028,7 +1035,7 @@
       TaskTrackerStatus ttStatus = 
         new TaskTrackerStatus(trackerName, trackerHostName, port, ttStatusList, 
                               0 , 0, 0);
-      ttStatus.setLastSeen(System.currentTimeMillis());
+      ttStatus.setLastSeen(clock.getTime());
 
       synchronized (JobTracker.this) {
         synchronized (taskTrackers) {
@@ -1244,7 +1251,7 @@
     }
 
     public void recover() {
-      long recoveryProcessStartTime = System.currentTimeMillis();
+      long recoveryProcessStartTime = clock.getTime();
       if (!shouldRecover()) {
         // clean up jobs structure
         jobsToRecover.clear();
@@ -1275,7 +1282,7 @@
 
           // check the access
           try {
-            checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB, ugi);
+            checkAccess(job, Queue.QueueOperation.SUBMIT_JOB, ugi);
           } catch (Throwable t) {
             LOG.warn("Access denied for user " + ugi.getUserName() 
                      + " in groups : [" 
@@ -1312,13 +1319,12 @@
           continue;
         }
       }
-
+      long now = clock.getTime();
       LOG.info("Took a total of " 
-               + StringUtils.formatTime(System.currentTimeMillis() 
+               + StringUtils.formatTime(now 
                                         - recoveryProcessStartTime) 
                + " for recovering filenames of all the jobs from history.");
 
-      long recoveryStartTime = System.currentTimeMillis();
 
       // II. Recover each job
       idIter = jobsToRecover.iterator();
@@ -1375,10 +1381,10 @@
         }
       }
 
-      long recoveryProcessEndTime = System.currentTimeMillis();
+      long recoveryProcessEndTime = clock.getTime();
       LOG.info("Took a total of " 
                + StringUtils.formatTime(recoveryProcessEndTime
-                                        - recoveryStartTime) 
+                                        - now) 
                + " for parsing and recovering all the jobs from history.");
 
       recoveryDuration = recoveryProcessEndTime - recoveryProcessStartTime;
@@ -1565,11 +1571,15 @@
 
   private QueueManager queueManager;
 
+  JobTracker(JobConf conf) throws IOException,InterruptedException{
+    this(conf, new Clock());
+  }
   /**
    * Start the JobTracker process, listen on the indicated port
    */
-  JobTracker(JobConf conf) throws IOException, InterruptedException {
+  JobTracker(JobConf conf, Clock clock) throws IOException, InterruptedException {
     // find the owner of the process
+    this.clock = clock;
     try {
       mrOwner = UnixUserGroupInformation.login(conf);
     } catch (LoginException e) {
@@ -1649,7 +1659,7 @@
         conf.get("mapred.job.tracker.http.address", "0.0.0.0:50030"));
     String infoBindAddress = infoSocAddr.getHostName();
     int tmpInfoPort = infoSocAddr.getPort();
-    this.startTime = System.currentTimeMillis();
+    this.startTime = clock.getTime();
     infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort, 
         tmpInfoPort == 0, conf);
     infoServer.setAttribute("job.tracker", this);
@@ -2152,7 +2162,7 @@
     final JobTrackerInstrumentation metrics = getInstrumentation();
     metrics.finalizeJob(conf, id);
     
-    long now = System.currentTimeMillis();
+    long now = clock.getTime();
     
     // mark the job for cleanup at all the trackers
     addJobForCleanup(id);
@@ -2568,7 +2578,7 @@
 
     // First check if the last heartbeat response got through
     String trackerName = status.getTrackerName();
-    long now = System.currentTimeMillis();
+    long now = clock.getTime();
     boolean isBlacklisted = false;
     if (restarted) {
       faultyTrackers.markTrackerHealthy(status.getHost());
@@ -3071,10 +3081,15 @@
       new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
       throw new IOException("Queue \"" + queue + "\" does not exist");        
     }
-
+    
+    //check if queue is RUNNING
+    if(!queueManager.isRunning(queue)) {
+      new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));      
+      throw new IOException("Queue \"" + queue + "\" is not running");
+    }
     // check for access
     try {
-      checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
+      checkAccess(job, Queue.QueueOperation.SUBMIT_JOB);
     } catch (IOException ioe) {
        LOG.warn("Access denied for user " + job.getJobConf().getUser() 
                 + ". Ignoring job " + jobId, ioe);
@@ -3122,7 +3137,7 @@
   // Check whether the specified operation can be performed
   // related to the job.
   private void checkAccess(JobInProgress job, 
-                                QueueManager.QueueOperation oper) 
+                                Queue.QueueOperation oper) 
                                   throws IOException {
     // get the user group info
     UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
@@ -3130,7 +3145,7 @@
   }
 
   // use the passed ugi for checking the access
-  private void checkAccess(JobInProgress job, QueueManager.QueueOperation oper,
+  private void checkAccess(JobInProgress job, Queue.QueueOperation oper,
                            UserGroupInformation ugi) throws IOException {
     // get the queue
     String queue = job.getProfile().getQueueName();
@@ -3192,7 +3207,7 @@
     }
         
     JobStatus prevStatus = (JobStatus)job.getStatus().clone();
-    checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
+    checkAccess(job, Queue.QueueOperation.ADMINISTER_JOBS);
     job.kill();
     
     // Inform the listeners if the job is killed
@@ -3225,7 +3240,7 @@
             + " is not a valid job");
         return;
     }
-    checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
+    checkAccess(job, Queue.QueueOperation.ADMINISTER_JOBS);
     JobPriority newPriority = JobPriority.valueOf(priority);
     setJobPriority(jobid, newPriority);
   }
@@ -3450,7 +3465,7 @@
   public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail) throws IOException{
     TaskInProgress tip = taskidToTIPMap.get(taskid);
     if(tip != null) {
-      checkAccess(tip.getJob(), QueueManager.QueueOperation.ADMINISTER_JOBS);
+      checkAccess(tip.getJob(), Queue.QueueOperation.ADMINISTER_JOBS);
       return tip.killTask(taskid, shouldFail);
     }
     else {
@@ -3847,10 +3862,10 @@
   }
 
   @Override
-  public void refreshQueueAcls() throws IOException{
-    LOG.info("Refreshing queue acls. requested by : " + 
+  public void refreshQueues() throws IOException{
+    LOG.info("Refreshing queue information. requested by : " + 
         UserGroupInformation.getCurrentUGI().getUserName());
-    this.queueManager.refreshAcls(new Configuration(this.conf));
+    this.queueManager.refreshQueues(new Configuration(this.conf));
   }
 
   private void initializeTaskMemoryRelatedConfig() {

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/QueueManager.java?rev=785794&r1=785793&r2=785794&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/QueueManager.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/QueueManager.java Wed Jun 17 20:55:51 2009
@@ -21,12 +21,14 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
-import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Queue.QueueState;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.SecurityUtil.AccessControlList;
 import org.apache.hadoop.util.StringUtils;
@@ -49,51 +51,24 @@
 class QueueManager {
   
   private static final Log LOG = LogFactory.getLog(QueueManager.class);
-  
-  // Prefix in configuration for queue related keys
-  private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX 
-                                                        = "mapred.queue.";
-  // Configured queues
+
+  // Configured queues this is backed by queues Map , mentioned below
   private Set<String> queueNames;
-  // Map of a queue and ACL property name with an ACL
-  private HashMap<String, AccessControlList> aclsMap;
-  // Map of a queue name to any generic object that represents 
-  // scheduler information 
-  private HashMap<String, Object> schedulerInfoObjects;
+
+  // Map of a queue name and Queue object
+  private HashMap<String, Queue> queues;
+
   // Whether ACLs are enabled in the system or not.
   private boolean aclsEnabled;
-  
-  //Resource in which queue acls are configured.
-  static final String QUEUE_ACLS_FILE_NAME = "mapred-queue-acls.xml";
-  
-  /**
-   * Enum representing an operation that can be performed on a queue.
-   */
-  static enum QueueOperation {
-    SUBMIT_JOB ("acl-submit-job", false),
-    ADMINISTER_JOBS ("acl-administer-jobs", true);
-    // TODO: Add ACL for LIST_JOBS when we have ability to authenticate 
-    //       users in UI
-    // TODO: Add ACL for CHANGE_ACL when we have an admin tool for 
-    //       configuring queues.
-    
-    private final String aclName;
-    private final boolean jobOwnerAllowed;
-    
-    QueueOperation(String aclName, boolean jobOwnerAllowed) {
-      this.aclName = aclName;
-      this.jobOwnerAllowed = jobOwnerAllowed;
-    }
 
-    final String getAclName() {
-      return aclName;
-    }
-    
-    final boolean isJobOwnerAllowed() {
-      return jobOwnerAllowed;
-    }
-  }
-  
+  static final String QUEUE_STATE_SUFFIX = "state";
+
+  static final String QUEUE_CONF_FILE_NAME = "mapred-queues.xml";
+
+  // Prefix in configuration for queue related keys
+  static final String QUEUE_CONF_PROPERTY_NAME_PREFIX
+                          = "mapred.queue.";//Resource in which queue acls are configured.
+
   /**
    * Construct a new QueueManager using configuration specified in the passed
    * in {@link org.apache.hadoop.conf.Configuration} object.
@@ -101,10 +76,28 @@
    * @param conf Configuration object where queue configuration is specified.
    */
   public QueueManager(Configuration conf) {
-    queueNames = new TreeSet<String>();
-    aclsMap = new HashMap<String, AccessControlList>();
-    schedulerInfoObjects = new HashMap<String, Object>();
-    initialize(conf);
+    checkDeprecation(conf);
+    conf.addResource(QUEUE_CONF_FILE_NAME);
+
+    queues = new HashMap<String, Queue>();
+
+    // First get the queue names 
+    String[] queueNameValues = conf.getStrings("mapred.queue.names",
+        new String[]{JobConf.DEFAULT_QUEUE_NAME});
+
+    // Get configured ACLs and state for each queue
+    aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
+    for (String name : queueNameValues) {
+      try {
+        Map<String, AccessControlList> acls = getQueueAcls(name, conf);
+        QueueState state = getQueueState(name, conf);
+        queues.put(name, new Queue(name, acls, state));
+      } catch (Throwable t) {
+        LOG.warn("Not able to initialize queue " + name);
+      }
+    }
+    // Sync queue names with the configured queues.
+    queueNames = queues.keySet();
   }
   
   /**
@@ -121,7 +114,7 @@
   }
   
   /**
-   * Return true if the given {@link QueueManager.QueueOperation} can be 
+   * Return true if the given {@link Queue.QueueOperation} can be 
    * performed by the specified user on the given queue.
    * 
    * An operation is allowed if all users are provided access for this
@@ -134,13 +127,14 @@
    * 
    * @return true if the operation is allowed, false otherwise.
    */
-  public synchronized boolean hasAccess(String queueName, QueueOperation oper,
+  public synchronized boolean hasAccess(String queueName, 
+                                Queue.QueueOperation oper,
                                 UserGroupInformation ugi) {
     return hasAccess(queueName, null, oper, ugi);
   }
   
   /**
-   * Return true if the given {@link QueueManager.QueueOperation} can be 
+   * Return true if the given {@link Queue.QueueOperation} can be 
    * performed by the specified user on the specified job in the given queue.
    * 
    * An operation is allowed either if the owner of the job is the user 
@@ -148,7 +142,7 @@
    * operation, or if either the user or any of the groups specified is
    * provided access.
    * 
-   * If the {@link QueueManager.QueueOperation} is not job specific then the 
+   * If the {@link Queue.QueueOperation} is not job specific then the 
    * job parameter is ignored.
    * 
    * @param queueName Queue on which the operation needs to be performed.
@@ -160,28 +154,37 @@
    * @return true if the operation is allowed, false otherwise.
    */
   public synchronized boolean hasAccess(String queueName, JobInProgress job, 
-                                QueueOperation oper, 
+                                Queue.QueueOperation oper, 
                                 UserGroupInformation ugi) {
     if (!aclsEnabled) {
       return true;
     }
     
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("checking access for : " + toFullPropertyName(queueName, 
-                                            oper.getAclName()));      
+    Queue q = queues.get(queueName);
+    if (q == null) {
+      LOG.info("Queue " + queueName + " is not present");
+      return false;
     }
     
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("checking access for : " 
+          + QueueManager.toFullPropertyName(queueName, oper.getAclName()));
+    }
+
     if (oper.isJobOwnerAllowed()) {
-      if (job != null && job.getJobConf().getUser().equals(ugi.getUserName())) {
+      if (job != null 
+          && job.getJobConf().getUser().equals(ugi.getUserName())) {
         return true;
       }
     }
     
-    AccessControlList acl = aclsMap.get(toFullPropertyName(queueName, oper.getAclName()));
+    AccessControlList acl = q.getAcls().get(
+                                toFullPropertyName(queueName, 
+                                    oper.getAclName()));
     if (acl == null) {
       return false;
     }
-    
+
     // Check the ACL list
     boolean allowed = acl.allAllowed();
     if (!allowed) {
@@ -199,8 +202,21 @@
         }
       }
     }
-    
-    return allowed;    
+
+    return allowed;
+  }
+
+  /**
+   * Checks whether the given queue is running or not.
+   * @param queueName name of the queue
+   * @return true, if the queue is running.
+   */
+  synchronized boolean isRunning(String queueName) {
+    Queue q = queues.get(queueName);
+    if (q != null) {
+      return q.getState().equals(QueueState.RUNNING);
+    }
+    return false;
   }
   
   /**
@@ -216,7 +232,8 @@
    */
   public synchronized void setSchedulerInfo(String queueName, 
                                               Object queueInfo) {
-    schedulerInfoObjects.put(queueName, queueInfo);
+    if (queues.get(queueName) != null)
+      queues.get(queueName).setSchedulingInfo(queueInfo);
   }
   
   /**
@@ -225,125 +242,179 @@
    * @param queueName queue for which the scheduling information is required.
    * @return The scheduling information for this queue.
    * 
-   * @see #setSchedulerInfo(String, Object)
+   * @see #setSchedulingInfo(String, Object)
    */
   public synchronized Object getSchedulerInfo(String queueName) {
-    return schedulerInfoObjects.get(queueName);
+    if (queues.get(queueName) != null)
+      return queues.get(queueName).getSchedulingInfo();
+    return null;
   }
   
   /**
-   * Refresh the acls for the configured queues in the system by reading
-   * it from mapred-queue-acls.xml.
+   * Refresh the acls and state for the configured queues in the system 
+   * by reading it from mapred-queues.xml.
    * 
-   * The previous acls are removed. Previously configured queues and
-   * if or not acl is disabled is retained.
+   * Previously configured queues and if or not acls are disabled is retained.
    * 
-   * @throws IOException when queue ACL configuration file is invalid.
+   * @throws IOException when queue configuration file is invalid.
    */
-  synchronized void refreshAcls(Configuration conf) throws IOException {
+  synchronized void refreshQueues(Configuration conf) throws IOException {
+
+    // First check if things are configured in mapred-site.xml,
+    // so we can print out a deprecation warning. 
+    // This check is needed only until we support the configuration
+    // in mapred-site.xml
+    checkDeprecation(conf);
+    
+    // Add the queue configuration file. Values from mapred-site.xml
+    // will be overridden.
+    conf.addResource(QUEUE_CONF_FILE_NAME);
+    
+    // Now we refresh the properties of the queues. Note that we
+    // do *not* refresh the queue names or the acls flag. Instead
+    // we use the older values configured for them.
+    LOG.info("Refreshing acls and state for configured queues.");
     try {
-      HashMap<String, AccessControlList> newAclsMap = 
-        getQueueAcls(conf);
-      aclsMap = newAclsMap;
+      Iterator<String> itr = queueNames.iterator();
+      while(itr.hasNext()) {
+        String name = itr.next();
+        Queue q = queues.get(name);
+        Map<String, AccessControlList> newAcls = getQueueAcls(name, conf);
+        QueueState newState = getQueueState(name, conf);
+        q.setAcls(newAcls);
+        q.setState(newState);
+      }
     } catch (Throwable t) {
       String exceptionString = StringUtils.stringifyException(t);
-      LOG.warn("Queue ACLs could not be refreshed because there was an " +
+      LOG.warn("Queues could not be refreshed because there was an " +
       		"exception in parsing the configuration: "+ exceptionString +
-      		". Existing ACLs are retained.");
+      		". Existing ACLs/state is retained.");
       throw new IOException(exceptionString);
     }
-
   }
   
+  // Check if queue properties are configured in the passed in
+  // configuration. If yes, print out deprecation warning messages.
   private void checkDeprecation(Configuration conf) {
-    for(String queue: queueNames) {
-      for (QueueOperation oper : QueueOperation.values()) {
-        String key = toFullPropertyName(queue, oper.getAclName());
-        String aclString = conf.get(key);
-        if(aclString != null) {
-          LOG.warn("Configuring queue ACLs in mapred-site.xml or " +
-          		"hadoop-site.xml is deprecated. Configure queue ACLs in " + 
-          		QUEUE_ACLS_FILE_NAME);
-          return;
+
+    // check if queues are defined.
+    String[] queues = null;
+    String queueNameValues = conf.get("mapred.queue.names");
+    if (queueNameValues != null) {
+      LOG.warn("Configuring \"mapred.queue.names\" in mapred-site.xml or " +
+          		"hadoop-site.xml is deprecated. Configure " +
+              "\"mapred.queue.names\" in " +
+          		QUEUE_CONF_FILE_NAME);
+      // store queues so we can check if ACLs are also configured
+      // in the deprecated files.
+      queues = conf.getStrings("mapred.queue.names");
+    }
+    
+    // check if the acls flag is defined
+    String aclsEnable = conf.get("mapred.acls.enabled");
+    if (aclsEnable != null) {
+      LOG.warn("Configuring \"mapred.acls.enabled\" in mapred-site.xml or " +
+          		"hadoop-site.xml is deprecated. Configure " +
+              "\"mapred.acls.enabled\" in " +
+          		QUEUE_CONF_FILE_NAME);
+    }
+    
+    // check if acls are defined
+    if (queues != null) {
+      for (String queue : queues) {
+        for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
+          String key = toFullPropertyName(queue, oper.getAclName());
+          String aclString = conf.get(key);
+          if (aclString != null) {
+            LOG.warn("Configuring queue ACLs in mapred-site.xml or " +
+          	  "hadoop-site.xml is deprecated. Configure queue ACLs in " + 
+          		QUEUE_CONF_FILE_NAME);
+            // even if one string is configured, it is enough for printing
+            // the warning. so we can return from here.
+            return;
+          }
         }
       }
     }
   }
   
-  private HashMap<String, AccessControlList> getQueueAcls(Configuration conf)  {
-    checkDeprecation(conf);
-    conf.addResource(QUEUE_ACLS_FILE_NAME);
-    HashMap<String, AccessControlList> aclsMap = 
-      new HashMap<String, AccessControlList>();
-    for (String queue : queueNames) {
-      for (QueueOperation oper : QueueOperation.values()) {
-        String key = toFullPropertyName(queue, oper.getAclName());
-        String aclString = conf.get(key, "*");
-        aclsMap.put(key, new AccessControlList(aclString));
-      }
-    } 
-    return aclsMap;
+  // Parse ACLs for the queue from the configuration.
+  private Map<String, AccessControlList> getQueueAcls(String name,
+                                                        Configuration conf) {
+    HashMap<String, AccessControlList> map = 
+        new HashMap<String, AccessControlList>();
+    for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
+      String aclKey = toFullPropertyName(name, oper.getAclName());
+      map.put(aclKey, new AccessControlList(conf.get(aclKey, "*")));
+    }
+    return map;
   }
-  
-  private void initialize(Configuration conf) {
-    aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
-    String[] queues = conf.getStrings("mapred.queue.names", 
-        new String[] {JobConf.DEFAULT_QUEUE_NAME});
-    addToSet(queueNames, queues);
-    aclsMap = getQueueAcls(conf);
+
+  // Parse ACLs for the queue from the configuration.
+  private QueueState getQueueState(String name, Configuration conf) {
+    QueueState retState = QueueState.RUNNING;
+    String stateVal = conf.get(toFullPropertyName(name,
+                                                  QueueManager.QUEUE_STATE_SUFFIX),
+                               QueueState.RUNNING.getStateName());
+    for (QueueState state : QueueState.values()) {
+      if (state.getStateName().equalsIgnoreCase(stateVal)) {
+        retState = state;
+        break;
+      }
+    }
+    return retState;
   }
-  
-  private static final String toFullPropertyName(String queue, 
+ 
+  public static final String toFullPropertyName(String queue,
       String property) {
     return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
   }
-  
-  private static final void addToSet(Set<String> set, String[] elems) {
-    for (String elem : elems) {
-      set.add(elem);
-    }
-  }
-  
+
   synchronized JobQueueInfo[] getJobQueueInfos() {
     ArrayList<JobQueueInfo> queueInfoList = new ArrayList<JobQueueInfo>();
-    for(String queue : queueNames) {
-      Object schedulerInfo = schedulerInfoObjects.get(queue);
-      if(schedulerInfo != null) {
-        queueInfoList.add(new JobQueueInfo(queue,schedulerInfo.toString()));
-      }else {
-        queueInfoList.add(new JobQueueInfo(queue,null));
+    for (String queue : queueNames) {
+      JobQueueInfo queueInfo = getJobQueueInfo(queue);
+      if (queueInfo != null) {
+        queueInfoList.add(getJobQueueInfo(queue));  
       }
     }
-    return (JobQueueInfo[]) queueInfoList.toArray(new JobQueueInfo[queueInfoList
-        .size()]);
+    return (JobQueueInfo[]) queueInfoList.toArray(
+            new JobQueueInfo[queueInfoList.size()]);
   }
 
-  JobQueueInfo getJobQueueInfo(String queue) {
-    Object schedulingInfo = schedulerInfoObjects.get(queue);
-    if(schedulingInfo!=null){
-      return new JobQueueInfo(queue,schedulingInfo.toString());
-    }else {
-      return new JobQueueInfo(queue,null);
+  synchronized JobQueueInfo getJobQueueInfo(String queue) {
+    if (queues.get(queue) != null) {
+      Object schedulingInfo = queues.get(queue).getSchedulingInfo();
+      JobQueueInfo qInfo;
+      if (schedulingInfo != null) {
+        qInfo = new JobQueueInfo(queue, schedulingInfo.toString());
+      } else {
+        qInfo = new JobQueueInfo(queue, null);
+      }
+      qInfo.setQueueState(queues.get(queue).getState().getStateName());
+      return qInfo;
     }
+    return null;
   }
 
   /**
-   * Generates the array of QueueAclsInfo object. The array consists of only those queues
-   * for which user <ugi.getUserName()> has acls
+   * Generates the array of QueueAclsInfo object. 
+   * 
+   * The array consists of only those queues for which user has acls.
    *
    * @return QueueAclsInfo[]
    * @throws java.io.IOException
    */
-  synchronized QueueAclsInfo[] getQueueAcls(UserGroupInformation
-          ugi) throws IOException {
+  synchronized QueueAclsInfo[] getQueueAcls(UserGroupInformation ugi)
+                                            throws IOException {
     //List of all QueueAclsInfo objects , this list is returned
     ArrayList<QueueAclsInfo> queueAclsInfolist =
             new ArrayList<QueueAclsInfo>();
-    QueueOperation[] operations = QueueOperation.values();
+    Queue.QueueOperation[] operations = Queue.QueueOperation.values();
     for (String queueName : queueNames) {
       QueueAclsInfo queueAclsInfo = null;
       ArrayList<String> operationsAllowed = null;
-      for (QueueOperation operation : operations) {
+      for (Queue.QueueOperation operation : operations) {
         if (hasAccess(queueName, operation, ugi)) {
           if (operationsAllowed == null) {
             operationsAllowed = new ArrayList<String>();
@@ -363,4 +434,10 @@
             queueAclsInfolist.size()]);
   }
 
+  // ONLY FOR TESTING - Do not use in production code.
+  synchronized void setQueues(Queue[] queues) {
+    for (Queue queue : queues) {
+      this.queues.put(queue.getName(), queue);
+    }
+  }
 }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=785794&r1=785793&r2=785794&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Jun 17 20:55:51 2009
@@ -32,6 +32,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobClient.RawSplit;
+import org.apache.hadoop.mapred.JobInProgress.DataStatistics;
 import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.net.Node;
@@ -52,9 +53,8 @@
  * **************************************************************
  */
 class TaskInProgress {
-  static final int MAX_TASK_EXECS = 1;
+  static final int MAX_TASK_EXECS = 1; //max # nonspec tasks to run concurrently  
   int maxTaskAttempts = 4;    
-  static final double SPECULATIVE_GAP = 0.2;
   static final long SPECULATIVE_LAG = 60 * 1000;
   private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
 
@@ -74,9 +74,10 @@
   private int numTaskFailures = 0;
   private int numKilledTasks = 0;
   private double progress = 0;
+  private double oldProgressRate;
   private String state = "";
-  private long startTime = 0;
-  private long execStartTime = 0;
+  private long dispatchTime = 0;   // most recent time task attempt given to TT
+  private long execStartTime = 0;  // when we started first task-attempt
   private long execFinishTime = 0;
   private int completes = 0;
   private boolean failed = false;
@@ -220,7 +221,6 @@
    * Initialization common to Map and Reduce
    */
   void init(JobID jobId) {
-    this.startTime = System.currentTimeMillis();
     this.id = new TaskID(jobId, isMapTask() ? TaskType.MAP : TaskType.REDUCE,
         partition);
     this.skipping = startSkipping();
@@ -229,12 +229,19 @@
   ////////////////////////////////////
   // Accessors, info, profiles, etc.
   ////////////////////////////////////
-
+  
   /**
-   * Return the start time
+   * Return the dispatch time
    */
-  public long getStartTime() {
-    return startTime;
+  public long getDispatchTime(){
+    return this.dispatchTime;
+  }
+  
+  /**
+   * Set the dispatch time
+   */
+  public void setDispatchTime(long disTime){
+    this.dispatchTime = disTime;
   }
   
   /**
@@ -399,9 +406,15 @@
                !tasksReportedClosed.contains(taskid)) {
       tasksReportedClosed.add(taskid);
       close = true; 
+      if (isComplete() && !isComplete(taskid)) {
+        addDiagnosticInfo(taskid, "Another (possibly speculative) attempt" +
+            " already SUCCEEDED");
+      }      
     } else if (isCommitPending(taskid) && !shouldCommit(taskid) &&
                !tasksReportedClosed.contains(taskid)) {
       tasksReportedClosed.add(taskid);
+      addDiagnosticInfo(taskid, "Another (possibly speculative) attempt" +
+           " went to COMMIT_PENDING state earlier");
       close = true; 
     } else {
       close = tasksToKill.keySet().contains(taskid);
@@ -562,6 +575,17 @@
     // but finishTime has to be updated.
     if (!isCleanupAttempt(taskid)) {
       taskStatuses.put(taskid, status);
+      if ((isMapTask() && job.hasSpeculativeMaps()) || 
+          (!isMapTask() && job.hasSpeculativeReduces())) {
+        long now = jobtracker.getClock().getTime();
+        double oldProgRate = getOldProgressRate();
+        double currProgRate = getCurrentProgressRate(now);
+        job.updateStatistics(oldProgRate, currProgRate, isMapTask());
+        //we need to store the current progress rate, so that we can
+        //update statistics accurately the next time we invoke
+        //updateStatistics
+        setProgressRate(currProgRate);
+      }
     } else {
       taskStatuses.get(taskid).statusUpdate(status.getRunState(),
         status.getProgress(), status.getStateString(), status.getPhase(),
@@ -619,7 +643,7 @@
 
       // tasktracker went down and failed time was not reported. 
       if (0 == status.getFinishTime()){
-        status.setFinishTime(System.currentTimeMillis());
+        status.setFinishTime(jobtracker.getClock().getTime());
       }
     }
 
@@ -723,7 +747,7 @@
     //
 
     this.completes++;
-    this.execFinishTime = System.currentTimeMillis();
+    this.execFinishTime = jobtracker.getClock().getTime();
     recomputeProgress();
     
   }
@@ -762,7 +786,7 @@
     }
     this.failed = true;
     killed = true;
-    this.execFinishTime = System.currentTimeMillis();
+    this.execFinishTime = jobtracker.getClock().getTime();
     recomputeProgress();
   }
 
@@ -860,35 +884,39 @@
   }
     
   /**
-   * Return whether the TIP has a speculative task to run.  We
-   * only launch a speculative task if the current TIP is really
-   * far behind, and has been behind for a non-trivial amount of 
-   * time.
+   * Can this task be speculated? This requires that it isn't done or almost
+   * done and that it isn't already being speculatively executed.
+   * 
+   * Added for use by queue scheduling algorithms.
+   * @param currentTime 
    */
-  boolean hasSpeculativeTask(long currentTime, double averageProgress) {
-    //
-    // REMIND - mjc - these constants should be examined
-    // in more depth eventually...
-    //
-      
-    if (!skipping && activeTasks.size() <= MAX_TASK_EXECS &&
-        (averageProgress - progress >= SPECULATIVE_GAP) &&
-        (currentTime - startTime >= SPECULATIVE_LAG) 
-        && completes == 0 && !isOnlyCommitPending()) {
-      return true;
-    }
-    return false;
+  boolean canBeSpeculated(long currentTime) {
+    DataStatistics taskStats = job.getRunningTaskStatistics(isMapTask());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("activeTasks.size(): " + activeTasks.size() + " "
+          + activeTasks.firstKey() + " task's progressrate: " + 
+          getCurrentProgressRate(currentTime) + 
+          " taskStats : " + taskStats);
+    }
+    return (!skipping && isRunnable() && isRunning() &&
+        activeTasks.size() <= MAX_TASK_EXECS &&
+        currentTime - dispatchTime >= SPECULATIVE_LAG &&
+        completes == 0 && !isOnlyCommitPending() &&
+        (taskStats.mean() - getCurrentProgressRate(currentTime) >
+              taskStats.std() * job.getSlowTaskThreshold()));
   }
-    
+  
+  /**
+   * Is the task currently speculating?
+   */
+  boolean isSpeculating() {
+   return (activeTasks.size() > MAX_TASK_EXECS);
+  }
+  
   /**
    * Return a Task that can be sent to a TaskTracker for execution.
    */
-  public Task getTaskToRun(String taskTracker) throws IOException {
-    if (0 == execStartTime){
-      // assume task starts running now
-      execStartTime = System.currentTimeMillis();
-    }
-
+  public Task getTaskToRun(String taskTracker) throws IOException {   
     // Create the 'taskid'; do not count the 'killed' tasks against the job!
     TaskAttemptID taskid = null;
     if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) {
@@ -903,6 +931,16 @@
       return null;
     }
 
+    //keep track of the last time we started an attempt at this TIP
+    //used to calculate the progress rate of this TIP
+    setDispatchTime(jobtracker.getClock().getTime());
+ 
+    //set this the first time we run a taskAttempt in this TIP
+    //each Task attempt has its own TaskStatus, which tracks that
+    //attempts execStartTime, thus this startTime is TIP wide.
+    if (0 == execStartTime){
+      setExecStartTime(dispatchTime);
+    }
     return addRunningTask(taskid, taskTracker);
   }
   
@@ -1084,6 +1122,34 @@
   }
   
   /**
+   * Compare most recent task attempts dispatch time to current system time so
+   * that task progress rate will slow down as time proceeds even if no progress
+   * is reported for the task. This allows speculative tasks to be launched for
+   * tasks on slow/dead TT's before we realize the TT is dead/slow. Skew isn't
+   * an issue since both times are from the JobTrackers perspective.
+   * @return the progress rate from the active task that is doing best
+   */
+  public double getCurrentProgressRate(long currentTime) {
+    double bestProgressRate = 0;
+    for (TaskStatus ts : taskStatuses.values()){
+      double progressRate = ts.getProgress()/Math.max(1,
+          currentTime - dispatchTime);
+      if ((ts.getRunState() == TaskStatus.State.RUNNING  || 
+          ts.getRunState() == TaskStatus.State.SUCCEEDED) &&
+          progressRate > bestProgressRate){
+        bestProgressRate = progressRate;
+      }
+    }
+    return bestProgressRate;
+  }
+  
+  private void setProgressRate(double rate) {
+    oldProgressRate = rate;
+  }
+  private double getOldProgressRate() {
+    return oldProgressRate;
+  }
+  /**
    * This class keeps the records to be skipped during further executions 
    * based on failed records from all the previous attempts.
    * It also narrow down the skip records if it is more than the 



Mime
View raw message