hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r785065 - in /hadoop/core/trunk: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/ src/contrib...
Date Tue, 16 Jun 2009 04:11:15 GMT
Author: ddas
Date: Tue Jun 16 04:11:14 2009
New Revision: 785065

URL: http://svn.apache.org/viewvc?rev=785065&view=rev
Log:
HADOOP-2141. Improves the speculative execution heuristic. The heuristic is currently based on the progress-rates of tasks and the expected time to complete. Also, statistics about trackers are collected, and speculative tasks are not given to the ones deduced to be slow. Contributed by Andy Konwinski and Devaraj Das.

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Clock.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/core/trunk/src/mapred/mapred-default.xml
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=785065&r1=785064&r2=785065&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Jun 16 04:11:14 2009
@@ -447,6 +447,12 @@
     HADOOP-5938. Change org.apache.hadoop.mapred.jobcontrol to use new
     api. (Amareshwari Sriramadasu via sharad)
 
+    HADOOP-2141. Improves the speculative execution heuristic. The heuristic
+    is currently based on the progress-rates of tasks and the expected time
+    to complete. Also, statistics about trackers are collected, and speculative
+    tasks are not given to the ones deduced to be slow. 
+    (Andy Konwinski and ddas)
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=785065&r1=785064&r2=785065&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Tue Jun 16 04:11:14 2009
@@ -585,7 +585,7 @@
       for(TaskInProgress tip : tips)  {
         if(tip.isRunning() 
             && !(tip.hasRunOnMachine(tts.getHost(), tts.getTrackerName())) 
-            && tip.hasSpeculativeTask(currentTime, progress)) {
+            && tip.canBeSpeculated(currentTime)) {
           return true;
         }
       }

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=785065&r1=785064&r2=785065&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue Jun 16 04:11:14 2009
@@ -164,7 +164,7 @@
     private int speculativeReduceTaskCounter = 0;
     public FakeJobInProgress(JobID jId, JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager, String user) {
-      super(jId, jobConf);
+      super(jId, jobConf, null);
       this.taskTrackerManager = taskTrackerManager;
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus(jId, 0f, 0f, JobStatus.PREP);
@@ -381,7 +381,7 @@
      *hasSpeculativeMap and hasSpeculativeReduce is reset by FakeJobInProgress
      *after the speculative tip has been scheduled.
      */
-    boolean hasSpeculativeTask(long currentTime, double averageProgress) {
+    boolean canBeSpeculated(long currentTime) {
       if(isMap && hasSpeculativeMap) {
         return fakeJob.getJobConf().getMapSpeculativeExecution();
       } 

Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java?rev=785065&r1=785064&r2=785065&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java (original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java Tue Jun 16 04:11:14 2009
@@ -33,9 +33,8 @@
   public int neededSpeculativeMaps(JobInProgress job) {
     int count = 0;
     long time = System.currentTimeMillis();
-    double avgProgress = job.getStatus().mapProgress();
     for (TaskInProgress tip: job.maps) {
-      if (tip.isRunning() && tip.hasSpeculativeTask(time, avgProgress)) {
+      if (tip.isRunning() && tip.canBeSpeculated(time)) {
         count++;
       }
     }
@@ -48,7 +47,7 @@
     long time = System.currentTimeMillis();
     double avgProgress = job.getStatus().reduceProgress();
     for (TaskInProgress tip: job.reduces) {
-      if (tip.isRunning() && tip.hasSpeculativeTask(time, avgProgress)) {
+      if (tip.isRunning() && tip.canBeSpeculated(time)) {
         count++;
       }
     }

Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=785065&r1=785064&r2=785065&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Tue Jun 16 04:11:14 2009
@@ -87,15 +87,6 @@
     double reduceFairShare = 0; // Fair share of reduce slots at last update
   }
   
-  /**
-   * A clock class - can be mocked out for testing.
-   */
-  static class Clock {
-    long getTime() {
-      return System.currentTimeMillis();
-    }
-  }
-  
   public FairScheduler() {
     this(new Clock(), true);
   }

Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=785065&r1=785064&r2=785065&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Tue Jun 16 04:11:14 2009
@@ -34,6 +34,7 @@
 import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.FairScheduler.JobInfo;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 
 public class TestFairScheduler extends TestCase {
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
@@ -52,7 +53,7 @@
     
     public FakeJobInProgress(JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager) throws IOException {
-      super(new JobID("test", ++jobCounter), jobConf);
+      super(new JobID("test", ++jobCounter), jobConf, null);
       this.taskTrackerManager = taskTrackerManager;
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus();
@@ -220,19 +221,6 @@
     }
   }
   
-  protected class FakeClock extends FairScheduler.Clock {
-    private long time = 0;
-    
-    public void advance(long millis) {
-      time += millis;
-    }
-
-    @Override
-    long getTime() {
-      return time;
-    }
-  }
-  
   protected JobConf conf;
   protected FairScheduler scheduler;
   private FakeTaskTrackerManager taskTrackerManager;

Modified: hadoop/core/trunk/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/mapred-default.xml?rev=785065&r1=785064&r2=785065&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/mapred-default.xml (original)
+++ hadoop/core/trunk/src/mapred/mapred-default.xml Tue Jun 16 04:11:14 2009
@@ -501,6 +501,31 @@
   <description>If true, then multiple instances of some reduce tasks 
                may be executed in parallel.</description>
 </property>
+<property>
+  <name>mapred.speculative.execution.speculativeCap</name>
+  <value>0.1</value>
+  <description>The max percent (0-1) of running tasks that
+  can be speculatively re-executed at any time.</description>
+</property>
+ 
+<property>
+  <name>mapred.speculative.execution.slowTaskThreshold</name>
+  <value>1.0</value>The number of standard deviations by which a task's 
+  ave progress-rates must be lower than the average of all running tasks'
+  for the task to be considered too slow.
+  <description>
+  </description>
+</property>
+
+<property>
+  <name>mapred.speculative.execution.slowNodeThreshold</name>
+  <value>1.0</value>
+  <description>The number of standard deviations by which a Task 
+  Tracker's ave map and reduce progress-rates (finishTime-dispatchTime)
+  must be lower than the average of all successful map/reduce task's for
+  the TT to be considered too slow to give a speculative task to.
+  </description>
+</property>
 
 <property>
   <name>mapred.job.reuse.jvm.num.tasks</name>

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Clock.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Clock.java?rev=785065&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Clock.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Clock.java Tue Jun 16 04:11:14 2009
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * A clock class - can be mocked out for testing.
+ */
+class Clock {
+  long getTime() {
+    return System.currentTimeMillis();
+  }
+}
\ No newline at end of file

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java?rev=785065&r1=785064&r2=785065&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java Tue Jun 16 04:11:14 2009
@@ -93,6 +93,16 @@
   }
 
   /**
+   * Construct a new cluster status.
+   * 
+   * @param trackers no. of tasktrackers in the cluster
+   * @param blacklists no of blacklisted task trackers in the cluster
+   * @param ttExpiryInterval the tasktracker expiry interval
+   * @param maps no. of currently running map-tasks in the cluster
+   * @param reduces no. of currently running reduce-tasks in the cluster
+   * @param maxMaps the maximum no. of map tasks in the cluster
+   * @param maxReduces the maximum no. of reduce tasks in the cluster
+   * @param state the {@link JobTracker.State} of the <code>JobTracker</code>
    * @param numDecommissionedNodes number of decommission trackers
    */
   ClusterStatus(int trackers, int blacklists, long ttExpiryInterval, 
@@ -133,6 +143,16 @@
   }
 
   /**
+   * Construct a new cluster status.
+   * 
+   * @param activeTrackers active tasktrackers in the cluster
+   * @param blacklistedTrackers blacklisted tasktrackers in the cluster
+   * @param ttExpiryInterval the tasktracker expiry interval
+   * @param maps no. of currently running map-tasks in the cluster
+   * @param reduces no. of currently running reduce-tasks in the cluster
+   * @param maxMaps the maximum no. of map tasks in the cluster
+   * @param maxReduces the maximum no. of reduce tasks in the cluster
+   * @param state the {@link JobTracker.State} of the <code>JobTracker</code>
    * @param numDecommissionNodes number of decommission trackers
    */
   ClusterStatus(Collection<String> activeTrackers, 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=785065&r1=785064&r2=785065&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Tue Jun 16 04:11:14 2009
@@ -21,6 +21,10 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -103,7 +107,7 @@
   private volatile boolean jobFailed = false;
 
   JobPriority priority = JobPriority.NORMAL;
-  final JobTracker jobtracker;
+  protected JobTracker jobtracker;
 
   // NetworkTopology Node to the set of TIPs
   Map<Node, List<TaskInProgress>> nonRunningMapCache;
@@ -129,14 +133,14 @@
   // A list of cleanup tasks for the reduce task attempts, to be launched
   List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
 
-  private final int maxLevel;
+  private int maxLevel;
 
   /**
    * A special value indicating that 
    * {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should
    * schedule any available map tasks for this job, including speculative tasks.
    */
-  private final int anyCacheLevel;
+  private int anyCacheLevel;
   
   /**
    * A special value indicating that 
@@ -189,9 +193,13 @@
   
   private MetricsRecord jobMetrics;
   
-  // Maximum no. of fetch-failure notifications after which
-  // the map task is killed
+  // Maximum no. of fetch-failure notifications after which map task is killed
   private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
+
+  // Don't lower speculativeCap below one TT's worth (for small clusters)
+  private static final int MIN_SPEC_CAP = 10;
+  
+  private static final float MIN_SLOTS_CAP = 0.01f;
   
   // Map of mapTaskId -> no. of fetch failures
   private Map<TaskAttemptID, Integer> mapTaskIdToFetchFailuresMap =
@@ -199,19 +207,65 @@
 
   private Object schedulingInfo;
 
-  
+  //thresholds for speculative execution
+  private float slowTaskThreshold;
+  private float speculativeCap;
+  private float slowNodeThreshold; //standard deviations
+
+  //Statistics are maintained for a couple of things
+  //mapTaskStats is used for maintaining statistics about
+  //the completion time of map tasks on the trackers. On a per
+  //tracker basis, the mean time for task completion is maintained
+  private DataStatistics mapTaskStats = new DataStatistics();
+  //reduceTaskStats is used for maintaining statistics about
+  //the completion time of reduce tasks on the trackers. On a per
+  //tracker basis, the mean time for task completion is maintained
+  private DataStatistics reduceTaskStats = new DataStatistics();
+  //trackerMapStats used to maintain a mapping from the tracker to the
+  //the statistics about completion time of map tasks
+  private Map<String,DataStatistics> trackerMapStats = 
+    new HashMap<String,DataStatistics>();
+  //trackerReduceStats used to maintain a mapping from the tracker to the
+  //the statistics about completion time of reduce tasks
+  private Map<String,DataStatistics> trackerReduceStats = 
+    new HashMap<String,DataStatistics>();
+  //runningMapStats used to maintain the RUNNING map tasks' statistics 
+  private DataStatistics runningMapTaskStats = new DataStatistics();
+  //runningReduceStats used to maintain the RUNNING reduce tasks' statistics
+  private DataStatistics runningReduceTaskStats = new DataStatistics();
+ 
   /**
    * Create an almost empty JobInProgress, which can be used only for tests
    */
-  protected JobInProgress(JobID jobid, JobConf conf) {
+  protected JobInProgress(JobID jobid, JobConf conf, JobTracker tracker) {
     this.conf = conf;
     this.jobId = jobid;
     this.numMapTasks = conf.getNumMapTasks();
     this.numReduceTasks = conf.getNumReduceTasks();
     this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL;
     this.anyCacheLevel = this.maxLevel+1;
-    this.jobtracker = null;
+    this.jobtracker = tracker;
     this.restartCount = 0;
+    
+    hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+    hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+    this.nonLocalMaps = new LinkedList<TaskInProgress>();
+    this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+    this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+    this.nonRunningReduces = new LinkedList<TaskInProgress>();    
+    this.runningReduces = new LinkedHashSet<TaskInProgress>();
+    this.resourceEstimator = new ResourceEstimator(this);
+    this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
+    this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
+    (numMapTasks + numReduceTasks + 10);
+    
+    this.slowTaskThreshold = Math.max(0.0f,
+        conf.getFloat("mapred.speculative.execution.slowTaskThreshold",1.0f));
+    this.speculativeCap = conf.getFloat(
+        "mapred.speculative.execution.speculativeCap",0.1f);
+    this.slowNodeThreshold = conf.getFloat(
+        "mapred.speculative.execution.slowNodeThreshold",1.0f);
+
   }
   
   /**
@@ -285,6 +339,19 @@
     this.nonRunningReduces = new LinkedList<TaskInProgress>();    
     this.runningReduces = new LinkedHashSet<TaskInProgress>();
     this.resourceEstimator = new ResourceEstimator(this);
+    
+    this.nonLocalMaps = new LinkedList<TaskInProgress>();
+    this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+    this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+    this.nonRunningReduces = new LinkedList<TaskInProgress>();    
+    this.runningReduces = new LinkedHashSet<TaskInProgress>();
+    this.slowTaskThreshold = Math.max(0.0f,
+        conf.getFloat("mapred.speculative.execution.slowTaskThreshold",1.0f));
+    this.speculativeCap = conf.getFloat(
+        "mapred.speculative.execution.speculativeCap",0.1f);
+    this.slowNodeThreshold = conf.getFloat(
+        "mapred.speculative.execution.slowNodeThreshold",1.0f);
+
   }
 
   /**
@@ -443,7 +510,7 @@
     }
         
     // set the launch time
-    this.launchTime = System.currentTimeMillis();
+    this.launchTime = jobtracker.getClock().getTime();
 
     //
     // Create reduce tasks
@@ -939,9 +1006,9 @@
       LOG.info("Cannot create task split for " + profile.getJobID());
       return null;
     }
-        
-    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, anyCacheLevel,
-                                status.mapProgress());
+       
+    int target = findNewMapTask(tts, clusterSize, numUniqueHosts,
+        anyCacheLevel);
     if (target == -1) {
       return null;
     }
@@ -1001,8 +1068,7 @@
       return null;
     }
 
-    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel, 
-                                status.mapProgress());
+    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel);
     if (target == -1) {
       return null;
     }
@@ -1025,7 +1091,7 @@
     }
 
     int target = findNewMapTask(tts, clusterSize, numUniqueHosts, 
-                                NON_LOCAL_CACHE_LEVEL, status.mapProgress());
+                                NON_LOCAL_CACHE_LEVEL);
     if (target == -1) {
       return null;
     }
@@ -1203,8 +1269,7 @@
       return null;
     }
 
-    int  target = findNewReduceTask(tts, clusterSize, numUniqueHosts, 
-                                    status.reduceProgress());
+    int  target = findNewReduceTask(tts, clusterSize, numUniqueHosts);
     if (target == -1) {
       return null;
     }
@@ -1267,15 +1332,21 @@
       name = Values.MAP.name();
       counter = JobCounter.TOTAL_LAUNCHED_MAPS;
       splits = tip.getSplitNodes();
-      if (tip.getActiveTasks().size() > 1)
+      if (tip.isSpeculating()) {
         speculativeMapTasks++;
+        LOG.debug("Chosen speculative task, current speculativeMap task count: "
+            + speculativeMapTasks);
+      }
       metrics.launchMap(id);
     } else {
       ++runningReduceTasks;
       name = Values.REDUCE.name();
       counter = JobCounter.TOTAL_LAUNCHED_REDUCES;
-      if (tip.getActiveTasks().size() > 1)
+      if (tip.isSpeculating()) {
         speculativeReduceTasks++;
+        LOG.debug("Chosen speculative task, current speculativeReduce task count: "
+          + speculativeReduceTasks);
+      }
       metrics.launchReduce(id);
     }
     // Note that the logs are for the scheduled tasks only. Tasks that join on 
@@ -1433,7 +1504,7 @@
     String[] splitLocations = tip.getSplitLocations();
 
     // Remove the TIP from the list for running non-local maps
-    if (splitLocations.length == 0) {
+    if (splitLocations == null || splitLocations.length == 0) {
       nonLocalRunningMaps.remove(tip);
       return;
     }
@@ -1473,8 +1544,9 @@
    * Adds a map tip to the list of running maps.
    * @param tip the tip that needs to be scheduled as running
    */
-  private synchronized void scheduleMap(TaskInProgress tip) {
+  protected synchronized void scheduleMap(TaskInProgress tip) {
     
+    runningMapTaskStats.add(0.0f);
     if (runningMapCache == null) {
       LOG.warn("Running cache for maps is missing!! " 
                + "Job details are missing.");
@@ -1483,7 +1555,7 @@
     String[] splitLocations = tip.getSplitLocations();
 
     // Add the TIP to the list of non-local running TIPs
-    if (splitLocations.length == 0) {
+    if (splitLocations == null || splitLocations.length == 0) {
       nonLocalRunningMaps.add(tip);
       return;
     }
@@ -1508,7 +1580,8 @@
    * Adds a reduce tip to the list of running reduces
    * @param tip the tip that needs to be scheduled as running
    */
-  private synchronized void scheduleReduce(TaskInProgress tip) {
+  protected synchronized void scheduleReduce(TaskInProgress tip) {
+    runningReduceTaskStats.add(0.0f);
     if (runningReduces == null) {
       LOG.warn("Running cache for reducers missing!! "
                + "Job details are missing.");
@@ -1612,57 +1685,71 @@
     return null;
   }
   
+  public boolean hasSpeculativeMaps() {
+    return hasSpeculativeMaps;
+  }
+
+  public boolean hasSpeculativeReduces() {
+    return hasSpeculativeReduces;
+  }
+
   /**
-   * Find a speculative task
-   * @param list a list of tips
-   * @param taskTracker the tracker that has requested a tip
-   * @param avgProgress the average progress for speculation
-   * @param currentTime current time in milliseconds
-   * @param shouldRemove whether to remove the tips
-   * @return a tip that can be speculated on the tracker
-   */
-  private synchronized TaskInProgress findSpeculativeTask(
-      Collection<TaskInProgress> list, TaskTrackerStatus ttStatus,
-      double avgProgress, long currentTime, boolean shouldRemove) {
+   * Retrieve a task for speculation.
+   * If a task slot becomes available and there are less than SpeculativeCap
+   * speculative tasks running: 
+   *  1)Ignore the request if the TT's progressRate is < SlowNodeThreshold
+   *  2)Choose candidate tasks - those tasks whose progress rate is below
+   *    slowTaskThreshold * mean(progress-rates)
+   *  3)Speculate task that's expected to complete last
+   * @param list pool of tasks to choose from
+   * @param taskTrackerName the name of the TaskTracker asking for a task
+   * @param taskTrackerHost the hostname of the TaskTracker asking for a task
+   * @return the TIP to speculatively re-execute
+   */
+  protected synchronized TaskInProgress findSpeculativeTask(
+      Collection<TaskInProgress> list, String taskTrackerName, 
+      String taskTrackerHost) {
+    if (list.isEmpty()) {
+      return null;
+    }
+    long now = jobtracker.getClock().getTime();
+    if (isSlowTracker(taskTrackerName) || atSpeculativeCap(list)) {
+      return null;
+    }
+    // List of speculatable candidates, start with all, and chop it down
+    ArrayList<TaskInProgress> candidates = new ArrayList<TaskInProgress>(list);
     
-    Iterator<TaskInProgress> iter = list.iterator();
-
+    Iterator<TaskInProgress> iter = candidates.iterator();
     while (iter.hasNext()) {
       TaskInProgress tip = iter.next();
-      // should never be true! (since we delete completed/failed tasks)
-      if (!tip.isRunning()) {
-        iter.remove();
-        continue;
-      }
-
-      if (!tip.hasRunOnMachine(ttStatus.getHost(), 
-                               ttStatus.getTrackerName())) {
-        if (tip.hasSpeculativeTask(currentTime, avgProgress)) {
-          // In case of shared list we don't remove it. Since the TIP failed 
-          // on this tracker can be scheduled on some other tracker.
-          if (shouldRemove) {
-            iter.remove(); //this tracker is never going to run it again
-          }
-          return tip;
-        } 
-      } else {
-        // Check if this tip can be removed from the list.
-        // If the list is shared then we should not remove.
-        if (shouldRemove) {
-          // This tracker will never speculate this tip
+      if (tip.hasRunOnMachine(taskTrackerHost, taskTrackerName) ||
+          !tip.canBeSpeculated(now)) {
+          //remove it from candidates
           iter.remove();
-        }
       }
     }
-    return null;
+    //resort according to expected time till completion
+    Comparator<TaskInProgress> LateComparator = 
+      new EstimatedTimeLeftComparator(now);
+    Collections.sort(candidates, LateComparator);
+    if (candidates.size() > 0 ) {
+      TaskInProgress tip = candidates.get(0);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Chose task " + tip.getTIPId() + ". Statistics: Task's : " +
+            tip.getCurrentProgressRate(now) + " Job's : " + 
+            (tip.isMapTask() ? runningMapTaskStats : runningReduceTaskStats));
+      }
+      return tip;
+    } else {
+      return null;
+    }
   }
-  
+
   /**
    * Find new map task
    * @param tts The task tracker that is asking for a task
    * @param clusterSize The number of task trackers in the cluster
    * @param numUniqueHosts The number of hosts that run task trackers
-   * @param avgProgress The average progress of this kind of task in this job
    * @param maxCacheLevel The maximum topology level until which to schedule
    *                      maps. 
    *                      A value of {@link #anyCacheLevel} implies any 
@@ -1675,14 +1762,14 @@
   private synchronized int findNewMapTask(final TaskTrackerStatus tts, 
                                           final int clusterSize,
                                           final int numUniqueHosts,
-                                          final int maxCacheLevel,
-                                          final double avgProgress) {
+                                          final int maxCacheLevel) {
+    String taskTrackerName = tts.getTrackerName();
+    String taskTrackerHost = tts.getHost();
     if (numMapTasks == 0) {
       LOG.info("No maps to schedule for " + profile.getJobID());
       return -1;
     }
 
-    String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
     
     //
@@ -1694,7 +1781,7 @@
       return -1;
     }
     
-    if (!shouldRunOnTaskTracker(taskTracker)) {
+    if (!shouldRunOnTaskTracker(taskTrackerName)) {
       return -1;
     }
 
@@ -1821,82 +1908,61 @@
     // 
  
     if (hasSpeculativeMaps) {
-      long currentTime = System.currentTimeMillis();
-
-      // 1. Check bottom up for speculative tasks from the running cache
-      if (node != null) {
-        Node key = node;
-        for (int level = 0; level < maxLevel; ++level) {
-          Set<TaskInProgress> cacheForLevel = runningMapCache.get(key);
-          if (cacheForLevel != null) {
-            tip = findSpeculativeTask(cacheForLevel, tts, 
-                                      avgProgress, currentTime, level == 0);
-            if (tip != null) {
-              if (cacheForLevel.size() == 0) {
-                runningMapCache.remove(key);
-              }
-              return tip.getIdWithinJob();
-            }
-          }
-          key = key.getParent();
-        }
+      tip = getSpeculativeMap(taskTrackerName, taskTrackerHost);
+      if (tip != null) {
+        return tip.getIdWithinJob();
       }
+    }
+   return -1;
+  }
 
-      // 2. Check breadth-wise for speculative tasks
-      
-      for (Node parent : nodesAtMaxLevel) {
-        // ignore the parent which is already scanned
-        if (parent == nodeParentAtMaxLevel) {
-          continue;
-        }
+  private synchronized TaskInProgress getSpeculativeMap(String taskTrackerName, 
+      String taskTrackerHost) {
 
-        Set<TaskInProgress> cache = runningMapCache.get(parent);
-        if (cache != null) {
-          tip = findSpeculativeTask(cache, tts, avgProgress, 
-                                    currentTime, false);
-          if (tip != null) {
-            // remove empty cache entries
-            if (cache.size() == 0) {
-              runningMapCache.remove(parent);
-            }
-            LOG.info("Choosing a non-local task " + tip.getTIPId() 
-                     + " for speculation");
-            return tip.getIdWithinJob();
-          }
-        }
-      }
-
-      // 3. Check non-local tips for speculation
-      tip = findSpeculativeTask(nonLocalRunningMaps, tts, avgProgress, 
-                                currentTime, false);
-      if (tip != null) {
-        LOG.info("Choosing a non-local task " + tip.getTIPId() 
-                 + " for speculation");
-        return tip.getIdWithinJob();
+    //////// Populate allTips with all TaskInProgress
+    Set<TaskInProgress> allTips = new HashSet<TaskInProgress>();
+    
+    // collection of node at max level in the cache structure
+    Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
+    // Add all tasks from max-level nodes breadth-wise
+    for (Node parent : nodesAtMaxLevel) {
+      Set<TaskInProgress> cache = runningMapCache.get(parent);
+      if (cache != null) {
+        allTips.addAll(cache);
       }
     }
+    // Add all non-local TIPs
+    allTips.addAll(nonLocalRunningMaps);
     
-    return -1;
+    ///////// Select a TIP to run on
+    TaskInProgress tip = findSpeculativeTask(allTips, taskTrackerName, 
+        taskTrackerHost);
+    
+    if (tip != null) {
+      LOG.info("Choosing map task " + tip.getTIPId() + 
+          " for speculative execution");
+    } else {
+      LOG.debug("No speculative map task found for tracker " + taskTrackerName);
+    }
+    return tip;
   }
-
+  
   /**
    * Find new reduce task
    * @param tts The task tracker that is asking for a task
    * @param clusterSize The number of task trackers in the cluster
    * @param numUniqueHosts The number of hosts that run task trackers
-   * @param avgProgress The average progress of this kind of task in this job
    * @return the index in tasks of the selected task (or -1 for no task)
    */
   private synchronized int findNewReduceTask(TaskTrackerStatus tts, 
                                              int clusterSize,
-                                             int numUniqueHosts,
-                                             double avgProgress) {
+                                             int numUniqueHosts) {
+    String taskTrackerName = tts.getTrackerName();
+    String taskTrackerHost = tts.getHost();
     if (numReduceTasks == 0) {
       LOG.info("No reduces to schedule for " + profile.getJobID());
       return -1;
     }
-
-    String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
 
     // Update the last-known clusterSize
@@ -1906,14 +1972,14 @@
       return -1;
     }
 
-    if (!shouldRunOnTaskTracker(taskTracker)) {
+    if (!shouldRunOnTaskTracker(taskTrackerName)) {
       return -1;
     }
 
     long outSize = resourceEstimator.getEstimatedReduceInputSize();
     long availSpace = tts.getResourceStatus().getAvailableSpace();
     if(availSpace < outSize) {
-      LOG.warn("No room for reduce task. Node " + taskTracker + " has " +
+      LOG.warn("No room for reduce task. Node " + taskTrackerName + " has " +
                 availSpace + 
                " bytes free; but we expect reduce input to take " + outSize);
 
@@ -1930,16 +1996,187 @@
 
     // 2. check for a reduce tip to be speculated
     if (hasSpeculativeReduces) {
-      tip = findSpeculativeTask(runningReduces, tts, avgProgress, 
-                                System.currentTimeMillis(), false);
+      tip = getSpeculativeReduce(taskTrackerName, taskTrackerHost);
       if (tip != null) {
-        scheduleReduce(tip);
         return tip.getIdWithinJob();
       }
     }
 
     return -1;
   }
+
+  private synchronized TaskInProgress getSpeculativeReduce(
+      String taskTrackerName, String taskTrackerHost) {
+    TaskInProgress tip = findSpeculativeTask(
+        runningReduces, taskTrackerName, taskTrackerHost);
+    if (tip != null) {
+      LOG.info("Choosing reduce task " + tip.getTIPId() + 
+          " for speculative execution");
+    }else {
+      LOG.debug("No speculative map task found for tracker " + taskTrackerHost);
+    }
+    return tip;
+  }
+
+    /**
+     * Check to see if the maximum number of speculative tasks are
+     * already being executed currently.
+     * @param tasks the set of tasks to test
+     * @return has the cap been reached?
+     */
+   private boolean atSpeculativeCap(Collection<TaskInProgress> tasks) {
+     float numTasks = tasks.size();
+     if (numTasks == 0){
+       return true; // avoid divide by zero
+     }
+
+     //return true if totalSpecTask < max(10, 0.01 * total-slots, 
+     //                                   0.1 * total-running-tasks)
+
+     if (speculativeMapTasks + speculativeReduceTasks < MIN_SPEC_CAP) {
+       return false; // at least one slow tracker's worth of slots(default=10)
+     }
+     ClusterStatus c = jobtracker.getClusterStatus(false); 
+     int numSlots = c.getMaxMapTasks() + c.getMaxReduceTasks();
+     if ((float)(speculativeMapTasks + speculativeReduceTasks) < 
+       numSlots * MIN_SLOTS_CAP) {
+       return false;
+     }
+     boolean atCap = (((float)(speculativeMapTasks+
+         speculativeReduceTasks)/numTasks) >= speculativeCap);
+     if (LOG.isDebugEnabled()) {
+       LOG.debug("SpeculativeCap is "+speculativeCap+", specTasks/numTasks is " +
+           ((float)(speculativeMapTasks+speculativeReduceTasks)/numTasks)+
+           ", so atSpecCap() is returning "+atCap);
+     }
+     return atCap;
+   }
+  
+  /**
+   * A class for comparing the estimated time to completion of two tasks
+   */
+  private static class EstimatedTimeLeftComparator 
+  implements Comparator<TaskInProgress> {
+    private long time;
+    public EstimatedTimeLeftComparator(long now) {
+      this.time = now;
+    }
+    /**
+     * Estimated time to completion is measured as:
+     *   % of task left to complete (1 - progress) / progress rate of the task.
+     * 
+     * This assumes that tasks are linear in their progress, which is 
+     * often wrong, especially since progress for reducers is currently
+     * calculated by evenly weighting their three stages (shuffle, sort, map)
+     * which rarely account for 1/3 each. This should be fixed in the future
+     * by calculating progressRate more intelligently or splitting these
+     * multi-phase tasks into individual tasks.
+     * 
+     * The ordering this comparator defines is: task1 < task2 if task1 is
+     * estimated to finish farther in the future => compare(t1,t2) returns -1
+     */
+    public int compare(TaskInProgress tip1, TaskInProgress tip2) {
+      //we have to use the Math.max in the denominator to avoid divide by zero
+      //error because prog and progRate can both be zero (if one is zero,
+      //the other one will be 0 too).
+      //We use inverse of time_reminaing=[(1- prog) / progRate]
+      //so that (1-prog) is in denom. because tasks can have arbitrarily 
+      //low progRates in practice (e.g. a task that is half done after 1000
+      //seconds will have progRate of 0.0000005) so we would rather 
+      //use Math.maxnon (1-prog) by putting it in the denominator 
+      //which will cause tasks with prog=1 look 99.99% done instead of 100%
+      //which is okay
+      double t1 = tip1.getCurrentProgressRate(time) / Math.max(0.0001, 
+          1.0 - tip1.getProgress());
+      double t2 = tip2.getCurrentProgressRate(time) / Math.max(0.0001, 
+          1.0 - tip2.getProgress());
+      if (t1 < t2) return -1;
+      else if (t2 < t1) return 1;
+      else return 0;
+    }
+  }
+  
+  /**
+   * Compares the ave progressRate of tasks that have finished on this 
+   * taskTracker to the ave of all succesfull tasks thus far to see if this 
+   * TT one is too slow for speculating.
+   * slowNodeThreshold is used to determine the number of standard deviations
+   * @param taskTracker the name of the TaskTracker we are checking
+   * @return is this TaskTracker slow
+   */
+  protected boolean isSlowTracker(String taskTracker) {
+    if (trackerMapStats.get(taskTracker) != null &&
+        trackerMapStats.get(taskTracker).mean() -
+        mapTaskStats.mean() > mapTaskStats.std()*slowNodeThreshold) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Tracker " + taskTracker + 
+            " declared slow. trackerMapStats.get(taskTracker).mean() :" + trackerMapStats.get(taskTracker).mean() +
+            " mapTaskStats :" + mapTaskStats);
+      }
+      return true;
+    }
+    if (trackerReduceStats.get(taskTracker) != null && 
+        trackerReduceStats.get(taskTracker).mean() -
+        reduceTaskStats.mean() > reduceTaskStats.std()*slowNodeThreshold) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Tracker " + taskTracker + 
+            " declared slow. trackerReduceStats.get(taskTracker).mean() :" + trackerReduceStats.get(taskTracker).mean() +
+            " reduceTaskStats :" + reduceTaskStats);
+      }
+      return true;
+    }
+    return false;
+  }
+  
+  static class DataStatistics{
+    private int count = 0;
+    private double sum = 0;
+    private double sumSquares = 0;
+    
+    public DataStatistics() {
+    }
+    
+    public DataStatistics(double initNum) {
+      this.count = 1;
+      this.sum = initNum;
+      this.sumSquares = initNum * initNum;
+    }
+    
+    public void add(double newNum) {
+      this.count++;
+      this.sum += newNum;
+      this.sumSquares += newNum * newNum;
+    }
+
+    public void updateStatistics(double old, double update) {
+      sub(old);
+      add(update);
+    }
+    private void sub(double oldNum) {
+      this.count--;
+      this.sum -= oldNum;
+      this.sumSquares -= oldNum * oldNum;
+    }
+    
+    public double mean() {
+      return sum/count;      
+    }
+  
+    public double var() {
+      // E(X^2) - E(X)^2
+      return (sumSquares/count) - mean() * mean();
+    }
+    
+    public double std() {
+      return Math.sqrt(this.var());
+    }
+    
+    public String toString() {
+      return "DataStatistics: count is " + count + ", sum is " + sum + 
+      ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std();
+    }
+    
+  }
   
   private boolean shouldRunOnTaskTracker(String taskTracker) {
     //
@@ -2002,7 +2239,6 @@
                                             TaskStatus status)
   {
     TaskAttemptID taskid = status.getTaskID();
-    int oldNumAttempts = tip.getActiveTasks().size();
     final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
         
     // Sanity check: is the TIP already complete? 
@@ -2018,10 +2254,9 @@
       }
       return false;
     } 
-
+    boolean wasSpeculating = tip.isSpeculating(); //store this fact
     LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() + 
              " successfully.");          
-
     // Mark the TIP as complete
     tip.completed(taskid);
     resourceEstimator.updateWithCompletedTask(status, tip);
@@ -2059,7 +2294,6 @@
                                 tip.getExecFinishTime(),
                                 status.getCounters()); 
         
-    int newNumAttempts = tip.getActiveTasks().size();
     if (tip.isJobSetupTask()) {
       // setup task has finished. kill the extra setup tip
       killSetupTip(!tip.isMapTask());
@@ -2096,12 +2330,11 @@
       jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
     } else if (tip.isMapTask()) {
       runningMapTasks -= 1;
-      // check if this was a sepculative task
-      if (oldNumAttempts > 1) {
-        speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
-      }
       finishedMapTasks += 1;
       metrics.completeMap(taskid);
+      if (hasSpeculativeMaps) {
+        updateTaskTrackerStats(tip,ttStatus,trackerMapStats,mapTaskStats);
+      }
       // remove the completed map from the resp running caches
       retireMap(tip);
       if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
@@ -2109,21 +2342,66 @@
       }
     } else {
       runningReduceTasks -= 1;
-      if (oldNumAttempts > 1) {
-        speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
-      }
       finishedReduceTasks += 1;
       metrics.completeReduce(taskid);
+      if (hasSpeculativeReduces) {
+        updateTaskTrackerStats(tip,ttStatus,trackerReduceStats,reduceTaskStats);
+      }
       // remove the completed reduces from the running reducers set
       retireReduce(tip);
       if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
         this.status.setReduceProgress(1.0f);
       }
     }
-    
+    decrementSpeculativeCount(wasSpeculating, tip);
     return true;
   }
-
+  
+  private void updateTaskTrackerStats(TaskInProgress tip, TaskTrackerStatus ttStatus, 
+      Map<String,DataStatistics> trackerStats, DataStatistics overallStats) {
+    float tipDuration = tip.getExecFinishTime()-tip.getDispatchTime();
+    DataStatistics ttStats = 
+      trackerStats.get(ttStatus.getTrackerName());
+    double oldMean = 0.0d;
+    //We maintain the mean of TaskTrackers' means. That way, we get a single
+    //data-point for every tracker (used in the evaluation in isSlowTracker)
+    if (ttStats != null) {
+      oldMean = ttStats.mean();
+      ttStats.add(tipDuration);
+      overallStats.updateStatistics(oldMean, ttStats.mean());
+    } else {
+      trackerStats.put(ttStatus.getTrackerName(),
+          (ttStats = new DataStatistics(tipDuration)));
+      overallStats.add(tipDuration);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Added mean of " +ttStats.mean() + " to trackerStats of type "+
+          (tip.isMapTask() ? "Map" : "Reduce") +
+          " on "+ttStatus.getTrackerName()+". DataStatistics is now: " +
+          trackerStats.get(ttStatus.getTrackerName()));
+    }
+  }
+  
+  public void updateStatistics(double oldProg, double newProg, boolean isMap) {
+    if (isMap) {   
+      runningMapTaskStats.updateStatistics(oldProg, newProg);
+    } else {
+      runningReduceTaskStats.updateStatistics(oldProg, newProg);
+    }
+  }
+  
+  public DataStatistics getRunningTaskStatistics(boolean isMap) {
+    if (isMap) {
+      return runningMapTaskStats;
+    } else {
+      return runningReduceTaskStats;
+    }
+  }
+  
+  public float getSlowTaskThreshold() {
+    return slowTaskThreshold;
+  }
+  
   /**
    * The job is done since all it's component tasks are either
    * successful or have failed.
@@ -2142,7 +2420,7 @@
       if (reduces.length == 0) {
         this.status.setReduceProgress(1.0f);
       }
-      this.finishTime = System.currentTimeMillis();
+      this.finishTime = jobtracker.getClock().getTime();
       LOG.info("Job " + this.status.getJobID() + 
                " has completed successfully.");
       JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime, 
@@ -2164,7 +2442,7 @@
         this.status = new JobStatus(status.getJobID(),
                                     1.0f, 1.0f, 1.0f, JobStatus.FAILED,
                                     status.getJobPriority());
-        this.finishTime = System.currentTimeMillis();
+        this.finishTime = jobtracker.getClock().getTime();
         JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime, 
                                      this.finishedMapTasks, 
                                      this.finishedReduceTasks);
@@ -2172,7 +2450,7 @@
         this.status = new JobStatus(status.getJobID(),
                                     1.0f, 1.0f, 1.0f, JobStatus.KILLED,
                                     status.getJobPriority());
-        this.finishTime = System.currentTimeMillis();
+        this.finishTime = jobtracker.getClock().getTime();
         JobHistory.JobInfo.logKilled(this.status.getJobID(), finishTime, 
                                      this.finishedMapTasks, 
                                      this.finishedReduceTasks);
@@ -2273,6 +2551,21 @@
     terminate(JobStatus.FAILED);
   }
   
+  private void decrementSpeculativeCount(boolean wasSpeculating, 
+      TaskInProgress tip) {
+    if (wasSpeculating) {
+      if (tip.isMapTask()) {
+        speculativeMapTasks--;
+        LOG.debug("Decrement count. Current speculativeMap task count: " +
+            speculativeMapTasks);
+      } else {
+        speculativeReduceTasks--;
+        LOG.debug("Decremented count. Current speculativeReduce task count: " + 
+            speculativeReduceTasks);
+      }
+    }
+  }
+  
   /**
    * A task assigned to this JobInProgress has reported in as failed.
    * Most of the time, we'll just reschedule execution.  However, after
@@ -2292,9 +2585,11 @@
     final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
     // check if the TIP is already failed
     boolean wasFailed = tip.isFailed();
+    boolean wasSpeculating = tip.isSpeculating();
 
     // Mark the taskid as FAILED or KILLED
     tip.incompleteSubTask(taskid, this.status);
+    decrementSpeculativeCount(wasSpeculating, tip);
    
     boolean isRunning = tip.isRunning();
     boolean isComplete = tip.isComplete();
@@ -2477,8 +2772,8 @@
    * @param reason The reason that the task failed
    * @param trackerName The task tracker the task failed on
    */
-  public void failedTask(TaskInProgress tip, TaskAttemptID taskid, String reason, 
-                         TaskStatus.Phase phase, TaskStatus.State state, 
+  public synchronized void failedTask(TaskInProgress tip, TaskAttemptID taskid,
+      String reason, TaskStatus.Phase phase, TaskStatus.State state, 
                          String trackerName) {
     TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), 
                                                     taskid,
@@ -2491,10 +2786,10 @@
     // update the actual start-time of the attempt
     TaskStatus oldStatus = tip.getTaskStatus(taskid); 
     long startTime = oldStatus == null
-                     ? System.currentTimeMillis()
+                     ? jobtracker.getClock().getTime()
                      : oldStatus.getStartTime();
     status.setStartTime(startTime);
-    status.setFinishTime(System.currentTimeMillis());
+    status.setFinishTime(jobtracker.getClock().getTime());
     boolean wasComplete = tip.isComplete();
     updateTaskStatus(tip, status);
     boolean isComplete = tip.isComplete();

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=785065&r1=785064&r2=785065&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue Jun 16 04:11:14 2009
@@ -138,6 +138,8 @@
   // system files should have 700 permission
   final static FsPermission SYSTEM_FILE_PERMISSION =
     FsPermission.createImmutable((short) 0700); // rwx------
+  
+  private Clock clock;
 
   /**
    * A client tried to submit a job before the Job Tracker was ready.
@@ -165,6 +167,10 @@
 
   public static final Log LOG = LogFactory.getLog(JobTracker.class);
     
+  public Clock getClock() {
+    return clock;
+  }
+  
   /**
    * Start the JobTracker with given configuration.
    * 
@@ -181,7 +187,7 @@
     JobTracker result = null;
     while (true) {
       try {
-        result = new JobTracker(conf);
+        result = new JobTracker(conf, new Clock());
         result.taskScheduler.setTaskTrackerManager(result);
         break;
       } catch (VersionMismatch e) {
@@ -242,7 +248,7 @@
         try {
           // Every 3 minutes check for any tasks that are overdue
           Thread.sleep(tasktrackerExpiryInterval/3);
-          long now = System.currentTimeMillis();
+          long now = clock.getTime();
           LOG.debug("Starting launching task sweep");
           synchronized (JobTracker.this) {
             synchronized (launchingTasks) {
@@ -295,7 +301,7 @@
     public void addNewTask(TaskAttemptID taskName) {
       synchronized (launchingTasks) {
         launchingTasks.put(taskName, 
-                           System.currentTimeMillis());
+                           clock.getTime());
       }
     }
       
@@ -339,7 +345,7 @@
           synchronized (JobTracker.this) {
             synchronized (taskTrackers) {
               synchronized (trackerExpiryQueue) {
-                long now = System.currentTimeMillis();
+                long now = clock.getTime();
                 TaskTrackerStatus leastRecent = null;
                 while ((trackerExpiryQueue.size() > 0) &&
                        ((leastRecent = trackerExpiryQueue.first()) != null) &&
@@ -405,7 +411,7 @@
         try {
           Thread.sleep(retireJobCheckInterval);
           List<JobInProgress> retiredJobs = new ArrayList<JobInProgress>();
-          long now = System.currentTimeMillis();
+          long now = clock.getTime();
           long retireBefore = now - retireJobInterval;
 
           synchronized (jobs) {
@@ -465,9 +471,9 @@
     long lastUpdated;
     boolean blacklisted; 
 
-    FaultInfo() {
+    FaultInfo(long time) {
       numFaults = 0;
-      lastUpdated = System.currentTimeMillis();
+      lastUpdated = time;
       blacklisted = false;
     }
 
@@ -517,14 +523,15 @@
     void incrementFaults(String hostName) {
       synchronized (potentiallyFaultyTrackers) {
         FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
+        long now = clock.getTime();
         if (fi == null) {
-          fi = new FaultInfo();
+          fi = new FaultInfo(now);
           potentiallyFaultyTrackers.put(hostName, fi);
         }
         int numFaults = fi.getFaultCount();
         ++numFaults;
         fi.setFaultCount(numFaults);
-        fi.setLastUpdated(System.currentTimeMillis());
+        fi.setLastUpdated(now);
         if (!fi.isBlacklisted()) {
           if (shouldBlacklist(hostName, numFaults)) {
             LOG.info("Adding " + hostName + " to the blacklist" +
@@ -1028,7 +1035,7 @@
       TaskTrackerStatus ttStatus = 
         new TaskTrackerStatus(trackerName, trackerHostName, port, ttStatusList, 
                               0 , 0, 0);
-      ttStatus.setLastSeen(System.currentTimeMillis());
+      ttStatus.setLastSeen(clock.getTime());
 
       synchronized (JobTracker.this) {
         synchronized (taskTrackers) {
@@ -1244,7 +1251,7 @@
     }
 
     public void recover() {
-      long recoveryProcessStartTime = System.currentTimeMillis();
+      long recoveryProcessStartTime = clock.getTime();
       if (!shouldRecover()) {
         // clean up jobs structure
         jobsToRecover.clear();
@@ -1312,13 +1319,12 @@
           continue;
         }
       }
-
+      long now = clock.getTime();
       LOG.info("Took a total of " 
-               + StringUtils.formatTime(System.currentTimeMillis() 
+               + StringUtils.formatTime(now 
                                         - recoveryProcessStartTime) 
                + " for recovering filenames of all the jobs from history.");
 
-      long recoveryStartTime = System.currentTimeMillis();
 
       // II. Recover each job
       idIter = jobsToRecover.iterator();
@@ -1375,10 +1381,10 @@
         }
       }
 
-      long recoveryProcessEndTime = System.currentTimeMillis();
+      long recoveryProcessEndTime = clock.getTime();
       LOG.info("Took a total of " 
                + StringUtils.formatTime(recoveryProcessEndTime
-                                        - recoveryStartTime) 
+                                        - now) 
                + " for parsing and recovering all the jobs from history.");
 
       recoveryDuration = recoveryProcessEndTime - recoveryProcessStartTime;
@@ -1565,11 +1571,15 @@
 
   private QueueManager queueManager;
 
+  JobTracker(JobConf conf) throws IOException,InterruptedException{
+    this(conf, new Clock());
+  }
   /**
    * Start the JobTracker process, listen on the indicated port
    */
-  JobTracker(JobConf conf) throws IOException, InterruptedException {
+  JobTracker(JobConf conf, Clock clock) throws IOException, InterruptedException {
     // find the owner of the process
+    this.clock = clock;
     try {
       mrOwner = UnixUserGroupInformation.login(conf);
     } catch (LoginException e) {
@@ -1649,7 +1659,7 @@
         conf.get("mapred.job.tracker.http.address", "0.0.0.0:50030"));
     String infoBindAddress = infoSocAddr.getHostName();
     int tmpInfoPort = infoSocAddr.getPort();
-    this.startTime = System.currentTimeMillis();
+    this.startTime = clock.getTime();
     infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort, 
         tmpInfoPort == 0, conf);
     infoServer.setAttribute("job.tracker", this);
@@ -2152,7 +2162,7 @@
     final JobTrackerInstrumentation metrics = getInstrumentation();
     metrics.finalizeJob(conf, id);
     
-    long now = System.currentTimeMillis();
+    long now = clock.getTime();
     
     // mark the job for cleanup at all the trackers
     addJobForCleanup(id);
@@ -2568,7 +2578,7 @@
 
     // First check if the last heartbeat response got through
     String trackerName = status.getTrackerName();
-    long now = System.currentTimeMillis();
+    long now = clock.getTime();
     boolean isBlacklisted = false;
     if (restarted) {
       faultyTrackers.markTrackerHealthy(status.getHost());

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=785065&r1=785064&r2=785065&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Tue Jun 16 04:11:14 2009
@@ -32,6 +32,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobClient.RawSplit;
+import org.apache.hadoop.mapred.JobInProgress.DataStatistics;
 import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.net.Node;
@@ -52,9 +53,8 @@
  * **************************************************************
  */
 class TaskInProgress {
-  static final int MAX_TASK_EXECS = 1;
+  static final int MAX_TASK_EXECS = 1; //max # nonspec tasks to run concurrently  
   int maxTaskAttempts = 4;    
-  static final double SPECULATIVE_GAP = 0.2;
   static final long SPECULATIVE_LAG = 60 * 1000;
   private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
 
@@ -74,9 +74,10 @@
   private int numTaskFailures = 0;
   private int numKilledTasks = 0;
   private double progress = 0;
+  private double oldProgressRate;
   private String state = "";
-  private long startTime = 0;
-  private long execStartTime = 0;
+  private long dispatchTime = 0;   // most recent time task attempt given to TT
+  private long execStartTime = 0;  // when we started first task-attempt
   private long execFinishTime = 0;
   private int completes = 0;
   private boolean failed = false;
@@ -220,7 +221,6 @@
    * Initialization common to Map and Reduce
    */
   void init(JobID jobId) {
-    this.startTime = System.currentTimeMillis();
     this.id = new TaskID(jobId, isMapTask() ? TaskType.MAP : TaskType.REDUCE,
         partition);
     this.skipping = startSkipping();
@@ -229,12 +229,19 @@
   ////////////////////////////////////
   // Accessors, info, profiles, etc.
   ////////////////////////////////////
-
+  
   /**
-   * Return the start time
+   * Return the dispatch time
    */
-  public long getStartTime() {
-    return startTime;
+  public long getDispatchTime(){
+    return this.dispatchTime;
+  }
+  
+  /**
+   * Set the dispatch time
+   */
+  public void setDispatchTime(long disTime){
+    this.dispatchTime = disTime;
   }
   
   /**
@@ -399,9 +406,15 @@
                !tasksReportedClosed.contains(taskid)) {
       tasksReportedClosed.add(taskid);
       close = true; 
+      if (isComplete() && !isComplete(taskid)) {
+        addDiagnosticInfo(taskid, "Another (possibly speculative) attempt" +
+            " already SUCCEEDED");
+      }      
     } else if (isCommitPending(taskid) && !shouldCommit(taskid) &&
                !tasksReportedClosed.contains(taskid)) {
       tasksReportedClosed.add(taskid);
+      addDiagnosticInfo(taskid, "Another (possibly speculative) attempt" +
+           " went to COMMIT_PENDING state earlier");
       close = true; 
     } else {
       close = tasksToKill.keySet().contains(taskid);
@@ -562,6 +575,17 @@
     // but finishTime has to be updated.
     if (!isCleanupAttempt(taskid)) {
       taskStatuses.put(taskid, status);
+      if ((isMapTask() && job.hasSpeculativeMaps()) || 
+          (!isMapTask() && job.hasSpeculativeReduces())) {
+        long now = jobtracker.getClock().getTime();
+        double oldProgRate = getOldProgressRate();
+        double currProgRate = getCurrentProgressRate(now);
+        job.updateStatistics(oldProgRate, currProgRate, isMapTask());
+        //we need to store the current progress rate, so that we can
+        //update statistics accurately the next time we invoke
+        //updateStatistics
+        setProgressRate(currProgRate);
+      }
     } else {
       taskStatuses.get(taskid).statusUpdate(status.getRunState(),
         status.getProgress(), status.getStateString(), status.getPhase(),
@@ -619,7 +643,7 @@
 
       // tasktracker went down and failed time was not reported. 
       if (0 == status.getFinishTime()){
-        status.setFinishTime(System.currentTimeMillis());
+        status.setFinishTime(jobtracker.getClock().getTime());
       }
     }
 
@@ -723,7 +747,7 @@
     //
 
     this.completes++;
-    this.execFinishTime = System.currentTimeMillis();
+    this.execFinishTime = jobtracker.getClock().getTime();
     recomputeProgress();
     
   }
@@ -762,7 +786,7 @@
     }
     this.failed = true;
     killed = true;
-    this.execFinishTime = System.currentTimeMillis();
+    this.execFinishTime = jobtracker.getClock().getTime();
     recomputeProgress();
   }
 
@@ -860,35 +884,39 @@
   }
     
   /**
-   * Return whether the TIP has a speculative task to run.  We
-   * only launch a speculative task if the current TIP is really
-   * far behind, and has been behind for a non-trivial amount of 
-   * time.
+   * Can this task be speculated? This requires that it isn't done or almost
+   * done and that it isn't already being speculatively executed.
+   * 
+   * Added for use by queue scheduling algorithms.
+   * @param currentTime 
    */
-  boolean hasSpeculativeTask(long currentTime, double averageProgress) {
-    //
-    // REMIND - mjc - these constants should be examined
-    // in more depth eventually...
-    //
-      
-    if (!skipping && activeTasks.size() <= MAX_TASK_EXECS &&
-        (averageProgress - progress >= SPECULATIVE_GAP) &&
-        (currentTime - startTime >= SPECULATIVE_LAG) 
-        && completes == 0 && !isOnlyCommitPending()) {
-      return true;
-    }
-    return false;
+  boolean canBeSpeculated(long currentTime) {
+    DataStatistics taskStats = job.getRunningTaskStatistics(isMapTask());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("activeTasks.size(): " + activeTasks.size() + " "
+          + activeTasks.firstKey() + " task's progressrate: " + 
+          getCurrentProgressRate(currentTime) + 
+          " taskStats : " + taskStats);
+    }
+    return (!skipping && isRunnable() && isRunning() &&
+        activeTasks.size() <= MAX_TASK_EXECS &&
+        currentTime - dispatchTime >= SPECULATIVE_LAG &&
+        completes == 0 && !isOnlyCommitPending() &&
+        (taskStats.mean() - getCurrentProgressRate(currentTime) >
+              taskStats.std() * job.getSlowTaskThreshold()));
   }
-    
+  
+  /**
+   * Is the task currently speculating?
+   */
+  boolean isSpeculating() {
+   return (activeTasks.size() > MAX_TASK_EXECS);
+  }
+  
   /**
    * Return a Task that can be sent to a TaskTracker for execution.
    */
-  public Task getTaskToRun(String taskTracker) throws IOException {
-    if (0 == execStartTime){
-      // assume task starts running now
-      execStartTime = System.currentTimeMillis();
-    }
-
+  public Task getTaskToRun(String taskTracker) throws IOException {   
     // Create the 'taskid'; do not count the 'killed' tasks against the job!
     TaskAttemptID taskid = null;
     if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) {
@@ -903,6 +931,16 @@
       return null;
     }
 
+    //keep track of the last time we started an attempt at this TIP
+    //used to calculate the progress rate of this TIP
+    setDispatchTime(jobtracker.getClock().getTime());
+ 
+    //set this the first time we run a taskAttempt in this TIP
+    //each Task attempt has its own TaskStatus, which tracks that
+    //attempts execStartTime, thus this startTime is TIP wide.
+    if (0 == execStartTime){
+      setExecStartTime(dispatchTime);
+    }
     return addRunningTask(taskid, taskTracker);
   }
   
@@ -1084,6 +1122,34 @@
   }
   
   /**
+   * Compare most recent task attempts dispatch time to current system time so
+   * that task progress rate will slow down as time proceeds even if no progress
+   * is reported for the task. This allows speculative tasks to be launched for
+   * tasks on slow/dead TT's before we realize the TT is dead/slow. Skew isn't
+   * an issue since both times are from the JobTrackers perspective.
+   * @return the progress rate from the active task that is doing best
+   */
+  public double getCurrentProgressRate(long currentTime) {
+    double bestProgressRate = 0;
+    for (TaskStatus ts : taskStatuses.values()){
+      double progressRate = ts.getProgress()/Math.max(1,
+          currentTime - dispatchTime);
+      if ((ts.getRunState() == TaskStatus.State.RUNNING  || 
+          ts.getRunState() == TaskStatus.State.SUCCEEDED) &&
+          progressRate > bestProgressRate){
+        bestProgressRate = progressRate;
+      }
+    }
+    return bestProgressRate;
+  }
+  
+  private void setProgressRate(double rate) {
+    oldProgressRate = rate;
+  }
+  private double getOldProgressRate() {
+    return oldProgressRate;
+  }
+  /**
    * This class keeps the records to be skipped during further executions 
    * based on failed records from all the previous attempts.
    * It also narrow down the skip records if it is more than the 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=785065&r1=785064&r2=785065&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Tue Jun 16 04:11:14 2009
@@ -50,7 +50,7 @@
   private String stateString;
   private String taskTracker;
     
-  private long startTime; 
+  private long startTime; //in ms
   private long finishTime; 
   private long outputSize;
     
@@ -81,7 +81,9 @@
   public TaskAttemptID getTaskID() { return taskid; }
   public abstract boolean getIsMap();
   public float getProgress() { return progress; }
-  public void setProgress(float progress) { this.progress = progress; } 
+  public void setProgress(float progress) {
+    this.progress = progress;
+  } 
   public State getRunState() { return runState; }
   public String getTaskTracker() {return taskTracker;}
   public void setTaskTracker(String tracker) { this.taskTracker = tracker;}
@@ -279,7 +281,7 @@
   public List<TaskAttemptID> getFetchFailedMaps() {
     return null;
   }
-  
+
   /**
    * Add to the list of maps from which output-fetches failed.
    *  
@@ -310,7 +312,7 @@
    * @param status updated status
    */
   synchronized void statusUpdate(TaskStatus status) {
-    this.progress = status.getProgress();
+    setProgress (status.getProgress());
     this.runState = status.getRunState();
     this.stateString = status.getStateString();
     this.nextRecordRange = status.getNextRecordRange();
@@ -397,7 +399,7 @@
 
   public void readFields(DataInput in) throws IOException {
     this.taskid.readFields(in);
-    this.progress = in.readFloat();
+    setProgress(in.readFloat());
     this.runState = WritableUtils.readEnum(in, State.class);
     this.diagnosticInfo = Text.readString(in);
     this.stateString = Text.readString(in);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=785065&r1=785064&r2=785065&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Tue Jun 16 04:11:14 2009
@@ -192,6 +192,12 @@
     taskReports = new ArrayList<TaskStatus>();
     resStatus = new ResourceStatus();
   }
+  
+  public TaskTrackerStatus(String trackerName, String host) {
+    this();
+    this.trackerName = trackerName;
+    this.host = host;
+  }
 
   /**
    */

Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=785065&r1=785064&r2=785065&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Tue Jun 16 04:11:14 2009
@@ -45,7 +45,7 @@
     
     public FakeJobInProgress(JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager) throws IOException {
-      super(new JobID("test", ++jobCounter), jobConf);
+      super(new JobID("test", ++jobCounter), jobConf, null);
       this.taskTrackerManager = taskTrackerManager;
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP);

Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=785065&r1=785064&r2=785065&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java Tue Jun 16 04:11:14 2009
@@ -42,7 +42,7 @@
    
     public FakeJobInProgress(JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager) throws IOException {
-      super(new JobID("test", ++jobCounter), jobConf);
+      super(new JobID("test", ++jobCounter), jobConf, null);
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP);
       this.status.setJobPriority(JobPriority.NORMAL);

Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=785065&r1=785064&r2=785065&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java Tue Jun 16 04:11:14 2009
@@ -32,7 +32,7 @@
     jc.setNumMapTasks(maps);
     jc.setNumReduceTasks(reduces);
     
-    JobInProgress jip = new JobInProgress(jid, jc);
+    JobInProgress jip = new JobInProgress(jid, jc, null);
     //unfortunately, we can't set job input size from here.
     ResourceEstimator re = new ResourceEstimator(jip);
     
@@ -64,7 +64,7 @@
     jc.setNumMapTasks(maps);
     jc.setNumReduceTasks(reduces);
     
-    JobInProgress jip = new JobInProgress(jid, jc) {
+    JobInProgress jip = new JobInProgress(jid, jc, null) {
       long getInputLength() {
         return singleMapInputSize*desiredMaps();
       }

Added: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java?rev=785065&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java (added)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java Tue Jun 16 04:11:14 2009
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.TaskStatus.Phase;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+public class TestSpeculativeExecution extends TestCase {
+
+  FakeJobInProgress job;
+  static FakeJobTracker jobTracker;
+  static String jtIdentifier = "test";
+  private static int jobCounter;
+  static class SpecFakeClock extends FakeClock {
+    long SPECULATIVE_LAG = TaskInProgress.SPECULATIVE_LAG;
+    @Override
+    public void advance(long millis) {
+      time += millis + SPECULATIVE_LAG;
+    }
+  };
+  static SpecFakeClock clock;
+  
+  static String trackers[] = new String[] {"tracker_tracker1:1000", 
+      "tracker_tracker2:1000", "tracker_tracker3:1000",
+      "tracker_tracker4:1000", "tracker_tracker5:1000"};
+
+  public static Test suite() {
+    TestSetup setup = 
+      new TestSetup(new TestSuite(TestSpeculativeExecution.class)) {
+      protected void setUp() throws Exception {
+        JobConf conf = new JobConf();
+        conf.set("mapred.job.tracker", "localhost:0");
+        conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+        jobTracker = new FakeJobTracker(conf, (clock = new SpecFakeClock()));
+        for (String tracker : trackers) {
+          jobTracker.heartbeat(new TaskTrackerStatus(tracker,
+              JobInProgress.convertTrackerNameToHostName(tracker)), false, 
+              true, false, (short)0);
+        }
+      }
+      protected void tearDown() throws Exception {
+        //delete the build/test/logs/ dir
+      }
+    };
+    return setup;
+  }
+  
+  /*
+   * This class is required mainly to check the speculative cap
+   * based on cluster size
+   */
+  static class FakeJobTracker extends JobTracker {
+    //initialize max{Map/Reduce} task capacities to twice the clustersize
+    int totalSlots = trackers.length * 4;
+    FakeJobTracker(JobConf conf, Clock clock) throws IOException, 
+    InterruptedException {
+      super(conf, clock);
+    }
+    @Override
+    public ClusterStatus getClusterStatus(boolean detailed) {
+      return new ClusterStatus(trackers.length,
+          0, 0, 0, 0, totalSlots/2, totalSlots/2, JobTracker.State.RUNNING, 0);
+    }
+    public void setNumSlots(int totalSlots) {
+      this.totalSlots = totalSlots;
+    }
+  }
+
+  static class FakeJobInProgress extends JobInProgress {
+    JobClient.RawSplit[] rawSplits;
+    FakeJobInProgress(JobConf jobConf, JobTracker tracker) throws IOException {
+      super(new JobID(jtIdentifier, ++jobCounter), jobConf, tracker);
+      //initObjects(tracker, numMaps, numReduces);
+    }
+    @Override
+    public synchronized void initTasks() throws IOException {
+      maps = new TaskInProgress[numMapTasks];
+      for (int i = 0; i < numMapTasks; i++) {
+        JobClient.RawSplit split = new JobClient.RawSplit();
+        split.setLocations(new String[0]);
+        maps[i] = new TaskInProgress(getJobID(), "test", 
+            split, jobtracker, getJobConf(), this, i);
+        nonLocalMaps.add(maps[i]);
+      }
+      reduces = new TaskInProgress[numReduceTasks];
+      for (int i = 0; i < numReduceTasks; i++) {
+        reduces[i] = new TaskInProgress(getJobID(), "test", 
+                                        numMapTasks, i, 
+                                        jobtracker, getJobConf(), this);
+        nonRunningReduces.add(reduces[i]);
+      }
+    }
+    private TaskAttemptID findTask(String trackerName, String trackerHost,
+        Collection<TaskInProgress> nonRunningTasks, 
+        Collection<TaskInProgress> runningTasks)
+    throws IOException {
+      TaskInProgress tip = null;
+      Iterator<TaskInProgress> iter = nonRunningTasks.iterator();
+      //look for a non-running task first
+      while (iter.hasNext()) {
+        TaskInProgress t = iter.next();
+        if (t.isRunnable() && !t.isRunning()) {
+          runningTasks.add(t);
+          iter.remove();
+          tip = t;
+          break;
+        }
+      }
+      if (tip == null) {
+        if (getJobConf().getSpeculativeExecution()) {
+          tip = findSpeculativeTask(runningTasks, trackerName, trackerHost);
+        }
+      }
+      if (tip != null) {
+        TaskAttemptID tId = tip.getTaskToRun(trackerName).getTaskID();
+        if (tip.isMapTask()) {
+          scheduleMap(tip);
+        } else {
+          scheduleReduce(tip);
+        }
+        //Set it to RUNNING
+        makeRunning(tId, tip, trackerName);
+        return tId;
+      }
+      return null;
+    }
+    public TaskAttemptID findMapTask(String trackerName)
+    throws IOException {
+      return findTask(trackerName, 
+          JobInProgress.convertTrackerNameToHostName(trackerName),
+          nonLocalMaps, nonLocalRunningMaps);
+    }
+    public TaskAttemptID findReduceTask(String trackerName) 
+    throws IOException {
+      return findTask(trackerName, 
+          JobInProgress.convertTrackerNameToHostName(trackerName),
+          nonRunningReduces, runningReduces);
+    }
+    public void finishTask(TaskAttemptID taskId) {
+      TaskInProgress tip = jobtracker.taskidToTIPMap.get(taskId);
+      TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId, 
+          1.0f, TaskStatus.State.SUCCEEDED, "", "", tip.machineWhereTaskRan(taskId), 
+          tip.isMapTask() ? Phase.MAP : Phase.REDUCE, new Counters());
+      updateTaskStatus(tip, status);
+    }
+    private void makeRunning(TaskAttemptID taskId, TaskInProgress tip, 
+        String taskTracker) {
+      addRunningTaskToTIP(tip, taskId, new TaskTrackerStatus(taskTracker,
+          JobInProgress.convertTrackerNameToHostName(taskTracker)), true);
+      TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId, 
+          0.0f, TaskStatus.State.RUNNING, "", "", taskTracker,
+          tip.isMapTask() ? Phase.MAP : Phase.REDUCE, new Counters());
+      updateTaskStatus(tip, status);
+    }
+    public void progressMade(TaskAttemptID taskId, float progress) {
+      TaskInProgress tip = jobtracker.taskidToTIPMap.get(taskId);
+      TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId, 
+          progress, TaskStatus.State.RUNNING, "", "", tip.machineWhereTaskRan(taskId), 
+          tip.isMapTask() ? Phase.MAP : Phase.REDUCE, new Counters());
+      updateTaskStatus(tip, status);
+    }
+  }
+
+  public void testIsSlowTracker() throws IOException {
+    TaskAttemptID[] taskAttemptID = new TaskAttemptID[20];
+    JobConf conf = new JobConf();
+    conf.setSpeculativeExecution(true);
+    conf.setNumMapTasks(10);
+    conf.setNumReduceTasks(0);
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);    
+    job.initTasks();
+    //schedule some tasks
+    taskAttemptID[0] = job.findMapTask(trackers[0]);
+    taskAttemptID[1] = job.findMapTask(trackers[0]);
+    taskAttemptID[2] = job.findMapTask(trackers[0]);
+    taskAttemptID[3] = job.findMapTask(trackers[1]);
+    taskAttemptID[4] = job.findMapTask(trackers[1]);
+    taskAttemptID[5] = job.findMapTask(trackers[1]);
+    taskAttemptID[6] = job.findMapTask(trackers[2]);
+    taskAttemptID[7] = job.findMapTask(trackers[2]);
+    taskAttemptID[8] = job.findMapTask(trackers[2]);
+    clock.advance(1000);
+    //Some tasks finish in 1 second (on trackers[0])
+    job.finishTask(taskAttemptID[0]);
+    job.finishTask(taskAttemptID[1]);
+    job.finishTask(taskAttemptID[2]);
+    clock.advance(1000);
+    //Some tasks finish in 2 second (on trackers[1])
+    job.finishTask(taskAttemptID[3]);
+    job.finishTask(taskAttemptID[4]);
+    job.finishTask(taskAttemptID[5]);
+    assertEquals("Tracker "+ trackers[0] + " expected to be not slow ",
+        job.isSlowTracker(trackers[0]), false);
+    clock.advance(100000);
+    //After a long time, some tasks finished on trackers[2]
+    job.finishTask(taskAttemptID[6]);
+    job.finishTask(taskAttemptID[7]);
+    job.finishTask(taskAttemptID[8]);
+    assertEquals("Tracker "+ trackers[2] + " expected to be slow ",
+        job.isSlowTracker(trackers[2]), true);
+  }
+  
+  public void testTaskToSpeculate() throws IOException {
+    TaskAttemptID[] taskAttemptID = new TaskAttemptID[6];
+    JobConf conf = new JobConf();
+    conf.setSpeculativeExecution(true);
+    conf.setNumMapTasks(5);
+    conf.setNumReduceTasks(5);
+    conf.setFloat("mapred.speculative.execution.slowTaskThreshold", 0.5f);
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);    
+    job.initTasks();
+    //schedule maps
+    taskAttemptID[0] = job.findReduceTask(trackers[0]);
+    taskAttemptID[1] = job.findReduceTask(trackers[1]);
+    taskAttemptID[2] = job.findReduceTask(trackers[2]);
+    taskAttemptID[3] = job.findReduceTask(trackers[3]);
+    taskAttemptID[4] = job.findReduceTask(trackers[3]);
+    clock.advance(5000);
+    job.finishTask(taskAttemptID[0]);
+    clock.advance(1000);
+    job.finishTask(taskAttemptID[1]);
+    clock.advance(20000);
+    //we should get a speculative task now
+    taskAttemptID[5] = job.findReduceTask(trackers[4]);
+    assertEquals(taskAttemptID[5].getTaskID().getId(),2);
+    clock.advance(5000);
+    job.finishTask(taskAttemptID[5]);
+    
+    taskAttemptID[5] = job.findReduceTask(trackers[4]);
+    assertEquals(taskAttemptID[5].getTaskID().getId(),3);
+    
+  }
+  
+  /*
+   * Tests the fact that we choose tasks with lesser progress
+   * among the possible candidates for speculation
+   */
+  public void testTaskLATEScheduling() throws IOException {
+    TaskAttemptID[] taskAttemptID = new TaskAttemptID[20];
+    JobConf conf = new JobConf();
+    conf.setSpeculativeExecution(true);
+    conf.setNumMapTasks(5);
+    conf.setNumReduceTasks(0);
+    conf.setFloat("mapred.speculative.execution.slowTaskThreshold", 0.5f);
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+    job.initTasks();
+
+    taskAttemptID[0] = job.findMapTask(trackers[0]);
+    taskAttemptID[1] = job.findMapTask(trackers[1]);
+    taskAttemptID[2] = job.findMapTask(trackers[2]);
+    taskAttemptID[3] = job.findMapTask(trackers[3]);
+    clock.advance(2000);
+    job.finishTask(taskAttemptID[0]);
+    job.finishTask(taskAttemptID[1]);
+    job.finishTask(taskAttemptID[2]);
+    clock.advance(28000);
+    taskAttemptID[4] = job.findMapTask(trackers[3]);
+    clock.advance(5000);
+    //by doing the above clock adjustments, we bring the progress rate of 
+    //taskID 3 lower than 4. For taskID 3, the rate is 85/35000
+    //and for taskID 4, the rate is 20/5000. But when we ask for a spec task
+    //now, we should get back taskID 4 (since that is expected to complete
+    //later than taskID 3).
+    job.progressMade(taskAttemptID[3], 0.85f);
+    job.progressMade(taskAttemptID[4], 0.20f);
+    taskAttemptID[5] = job.findMapTask(trackers[4]);
+    assertEquals(taskAttemptID[5].getTaskID().getId(),4);
+  }
+  
+  /*
+   * Tests the fact that we only launch a limited number of speculative tasks,
+   * even though we have a lot of tasks in RUNNING state
+   */
+  public void testAtSpeculativeCap() throws IOException {
+    //The expr which is evaluated for determining whether atSpeculativeCap should
+    //return true or false is
+    //(#speculative-tasks < max (10, 0.01*#slots, 0.1*#running-tasks)
+    
+    //Tests the fact that the max tasks launched is 0.1 * #running-tasks
+    assertEquals(speculativeCap(1200,800,20), 40);
+    //Tests the fact that the max tasks launched is 10
+    assertEquals(speculativeCap(1200,1150,20), 10);
+    //Tests the fact that the max tasks launched is 0.01 * #slots
+    assertEquals(speculativeCap(1200,1150,2000), 20);
+  }
+  
+  private int speculativeCap(int totalTasks, int numEarlyComplete, int slots)
+  throws IOException {
+    TaskAttemptID[] taskAttemptID = new TaskAttemptID[1500];
+    JobConf conf = new JobConf();
+    conf.setSpeculativeExecution(true);
+    conf.setNumMapTasks(totalTasks);
+    conf.setNumReduceTasks(0);
+    jobTracker.setNumSlots(slots);
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+    job.initTasks();
+    int i;
+    for (i = 0; i < totalTasks; i++) {
+      taskAttemptID[i] = job.findMapTask(trackers[0]);
+    }
+    clock.advance(5000);
+    for (i = 0; i < numEarlyComplete; i++) {
+      job.finishTask(taskAttemptID[i]);
+    }
+    for (i = numEarlyComplete; i < totalTasks; i++) {
+      job.progressMade(taskAttemptID[i], 0.85f);
+    }
+    clock.advance(50000);
+    for (i = 0; i < (totalTasks - numEarlyComplete); i++) {
+      taskAttemptID[i] = job.findMapTask(trackers[1]);
+      clock.advance(2000);
+      if (taskAttemptID[i] != null) {
+        //add some good progress constantly for the different task-attempts so that
+        //the tasktracker doesn't get into the slow trackers category
+        job.progressMade(taskAttemptID[i], 0.99f);
+      } else {
+        break;
+      }
+    }
+    return i;
+  }
+}
\ No newline at end of file

Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=785065&r1=785064&r2=785065&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Tue Jun 16 04:11:14 2009
@@ -24,9 +24,6 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.FileNotFoundException;
 import java.text.DecimalFormat;
 import java.util.Arrays;
 import java.util.Enumeration;
@@ -35,7 +32,6 @@
 
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.RandomWriter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -650,7 +646,18 @@
 
     return job;
   }
+  static class FakeClock extends Clock {
+    long time = 0;
+    
+    public void advance(long millis) {
+      time += millis;
+    }
 
+    @Override
+    long getTime() {
+      return time;
+    }
+  }
   // Mapper that fails
   static class FailMapper extends MapReduceBase implements
       Mapper<WritableComparable, Writable, WritableComparable, Writable> {



Mime
View raw message