hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1157290 [2/4] - in /hadoop/common/trunk/mapreduce: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/counters/ src/java/org/apache/hadoop/mapreduce/jobhistory/ src/java/org/apa...
Date Fri, 12 Aug 2011 23:25:52 GMT
Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java.orig
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java.orig?rev=1157290&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java.orig (added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java.orig Fri Aug 12 23:25:51 2011
@@ -0,0 +1,3729 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.Vector;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobPriorityChangeEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobStatusChangedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * JobInProgress maintains all the info for keeping a Job on the straight and
+ * narrow. It keeps its JobProfile and its latest JobStatus, plus a set of
+ * tables for doing bookkeeping of its Tasks.
+ */
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class JobInProgress {
+  /**
+   * Used when the a kill is issued to a job which is initializing.
+   */
+  static class KillInterruptedException extends InterruptedException {
+   private static final long serialVersionUID = 1L;
+    public KillInterruptedException(String msg) {
+      super(msg);
+    }
+  }
+  
+  static final Log LOG = LogFactory.getLog(JobInProgress.class);
+    
+  JobProfile profile;
+  JobStatus status;
+  Path jobFile = null;
+  Path localJobFile = null;
+
+  TaskInProgress maps[] = new TaskInProgress[0];
+  TaskInProgress reduces[] = new TaskInProgress[0];
+  TaskInProgress cleanup[] = new TaskInProgress[0];
+  TaskInProgress setup[] = new TaskInProgress[0];
+  int numMapTasks = 0;
+  int numReduceTasks = 0;
+  final long memoryPerMap;
+  final long memoryPerReduce;
+  volatile int numSlotsPerMap = 1;
+  volatile int numSlotsPerReduce = 1;
+  final int maxTaskFailuresPerTracker;
+  
+  // Counters to track currently running/finished/failed Map/Reduce task-attempts
+  int runningMapTasks = 0;
+  int runningReduceTasks = 0;
+  int finishedMapTasks = 0;
+  int finishedReduceTasks = 0;
+  int failedMapTasks = 0; 
+  int failedReduceTasks = 0;
+  
+  static final float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
+  int completedMapsForReduceSlowstart = 0;
+  
+  // runningMapTasks include speculative tasks, so we need to capture 
+  // speculative tasks separately 
+  int speculativeMapTasks = 0;
+  int speculativeReduceTasks = 0;
+  
+  int mapFailuresPercent = 0;
+  int reduceFailuresPercent = 0;
+  int failedMapTIPs = 0;
+  int failedReduceTIPs = 0;
+  private volatile boolean launchedCleanup = false;
+  private volatile boolean launchedSetup = false;
+  private volatile boolean jobKilled = false;
+  private volatile boolean jobFailed = false;
+  private final boolean jobSetupCleanupNeeded;
+  private final boolean taskCleanupNeeded;
+
+  JobPriority priority = JobPriority.NORMAL;
+  protected JobTracker jobtracker;
+  
+  protected Credentials tokenStorage;
+  
+  JobHistory jobHistory;
+
+  // NetworkTopology Node to the set of TIPs
+  Map<Node, List<TaskInProgress>> nonRunningMapCache;
+  
+  // Map of NetworkTopology Node to set of running TIPs
+  Map<Node, Set<TaskInProgress>> runningMapCache;
+
+  // A list of non-local non-running maps
+  List<TaskInProgress> nonLocalMaps;
+
+  // A set of non-local running maps
+  Set<TaskInProgress> nonLocalRunningMaps;
+
+  // A list of non-running reduce TIPs
+  List<TaskInProgress> nonRunningReduces;
+
+  // A set of running reduce TIPs
+  Set<TaskInProgress> runningReduces;
+  
+  // A list of cleanup tasks for the map task attempts, to be launched
+  List<TaskAttemptID> mapCleanupTasks = new LinkedList<TaskAttemptID>();
+  
+  // A list of cleanup tasks for the reduce task attempts, to be launched
+  List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
+
+  int maxLevel;
+
+  /**
+   * A special value indicating that 
+   * {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should
+   * schedule any available map tasks for this job, including speculative tasks.
+   */
+  int anyCacheLevel;
+  
+  /**
+   * A special value indicating that 
+   * {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should
+   * schedule any only off-switch and speculative map tasks for this job.
+   */
+  private static final int NON_LOCAL_CACHE_LEVEL = -1;
+
+  private int taskCompletionEventTracker = 0; 
+  List<TaskCompletionEvent> taskCompletionEvents;
+    
+  // The maximum percentage of trackers in cluster added to the 'blacklist'.
+  private static final double CLUSTER_BLACKLIST_PERCENT = 0.25;
+  
+  // The maximum percentage of fetch failures allowed for a map 
+  private static final double MAX_ALLOWED_FETCH_FAILURES_PERCENT = 0.5;
+  
+  // No. of tasktrackers in the cluster
+  private volatile int clusterSize = 0;
+  
+  // The no. of tasktrackers where >= conf.getMaxTaskFailuresPerTracker()
+  // tasks have failed
+  private volatile int flakyTaskTrackers = 0;
+  // Map of trackerHostName -> no. of task failures
+  private Map<String, Integer> trackerToFailuresMap = 
+    new TreeMap<String, Integer>();
+    
+  //Confine estimation algorithms to an "oracle" class that JIP queries.
+  ResourceEstimator resourceEstimator; 
+  
+  long startTime;
+  long launchTime;
+  long finishTime;
+
+  // First *task launch times
+  final Map<TaskType, Long> firstTaskLaunchTimes =
+      new EnumMap<TaskType, Long>(TaskType.class);
+  
+  // Indicates how many times the job got restarted
+  private final int restartCount;
+
+  JobConf conf;
+  protected AtomicBoolean tasksInited = new AtomicBoolean(false);
+  private JobInitKillStatus jobInitKillStatus = new JobInitKillStatus();
+
+  LocalFileSystem localFs;
+  FileSystem fs;
+  String user;
+  JobID jobId;
+  volatile private boolean hasSpeculativeMaps;
+  volatile private boolean hasSpeculativeReduces;
+  long inputLength = 0;
+  
+  Counters jobCounters = new Counters();
+  
+  // Maximum no. of fetch-failure notifications after which map task is killed
+  private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
+
+  // Don't lower speculativeCap below one TT's worth (for small clusters)
+  private static final int MIN_SPEC_CAP = 10;
+  
+  private static final float MIN_SLOTS_CAP = 0.01f;
+  
+  // Map of mapTaskId -> no. of fetch failures
+  private Map<TaskAttemptID, Integer> mapTaskIdToFetchFailuresMap =
+    new TreeMap<TaskAttemptID, Integer>();
+
+  private Object schedulingInfo;
+  private String submitHostName;
+  private String submitHostAddress;
+
+  //thresholds for speculative execution
+  float slowTaskThreshold;
+  float speculativeCap;
+  float slowNodeThreshold; //standard deviations
+
+  //Statistics are maintained for a couple of things
+  //mapTaskStats is used for maintaining statistics about
+  //the completion time of map tasks on the trackers. On a per
+  //tracker basis, the mean time for task completion is maintained
+  private DataStatistics mapTaskStats = new DataStatistics();
+  //reduceTaskStats is used for maintaining statistics about
+  //the completion time of reduce tasks on the trackers. On a per
+  //tracker basis, the mean time for task completion is maintained
+  private DataStatistics reduceTaskStats = new DataStatistics();
+  //trackerMapStats used to maintain a mapping from the tracker to the
+  //the statistics about completion time of map tasks
+  private Map<String,DataStatistics> trackerMapStats = 
+    new HashMap<String,DataStatistics>();
+  //trackerReduceStats used to maintain a mapping from the tracker to the
+  //the statistics about completion time of reduce tasks
+  private Map<String,DataStatistics> trackerReduceStats = 
+    new HashMap<String,DataStatistics>();
+  //runningMapStats used to maintain the RUNNING map tasks' statistics 
+  private DataStatistics runningMapTaskStats = new DataStatistics();
+  //runningReduceStats used to maintain the RUNNING reduce tasks' statistics
+  private DataStatistics runningReduceTaskStats = new DataStatistics();
+ 
+  private static class FallowSlotInfo {
+    long timestamp;
+    int numSlots;
+    
+    public FallowSlotInfo(long timestamp, int numSlots) {
+      this.timestamp = timestamp;
+      this.numSlots = numSlots;
+    }
+
+    public long getTimestamp() {
+      return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+      this.timestamp = timestamp;
+    }
+
+    public int getNumSlots() {
+      return numSlots;
+    }
+
+    public void setNumSlots(int numSlots) {
+      this.numSlots = numSlots;
+    }
+  }
+  
+  private Map<TaskTracker, FallowSlotInfo> trackersReservedForMaps = 
+    new HashMap<TaskTracker, FallowSlotInfo>();
+  private Map<TaskTracker, FallowSlotInfo> trackersReservedForReduces = 
+    new HashMap<TaskTracker, FallowSlotInfo>();
+  private Path jobSubmitDir = null;
+  
+  /**
+   * Create an almost empty JobInProgress, which can be used only for tests
+   */
+  protected JobInProgress(JobID jobid, JobConf conf, JobTracker tracker) {
+    this.conf = conf;
+    this.jobId = jobid;
+    this.numMapTasks = conf.getNumMapTasks();
+    this.numReduceTasks = conf.getNumReduceTasks();
+    this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL;
+    this.anyCacheLevel = this.maxLevel+1;
+    this.jobtracker = tracker;
+    this.restartCount = 0;
+    this.profile = new JobProfile(conf.getUser(), jobid, "", "", 
+                                  conf.getJobName(),conf.getQueueName());
+
+    this.memoryPerMap = conf.getMemoryForMapTask();
+    this.memoryPerReduce = conf.getMemoryForReduceTask();
+
+    this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
+
+    
+    hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+    hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+    this.nonLocalMaps = new LinkedList<TaskInProgress>();
+    this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+    this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+    this.nonRunningReduces = new LinkedList<TaskInProgress>();    
+    this.runningReduces = new LinkedHashSet<TaskInProgress>();
+    this.resourceEstimator = new ResourceEstimator(this);
+    this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP, 
+        this.profile.getUser(), this.profile.getJobName(), 
+        this.profile.getJobFile(), "");
+    this.jobtracker.getInstrumentation().addPrepJob(conf, jobid);
+    this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
+    (numMapTasks + numReduceTasks + 10);
+    
+    this.slowTaskThreshold = Math.max(0.0f,
+        conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f));
+    this.speculativeCap = conf.getFloat(
+        MRJobConfig.SPECULATIVECAP,0.1f);
+    this.slowNodeThreshold = conf.getFloat(
+        MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD,1.0f);
+    this.jobSetupCleanupNeeded = conf.getBoolean(
+        MRJobConfig.SETUP_CLEANUP_NEEDED, true);
+    this.taskCleanupNeeded = conf.getBoolean(
+        MRJobConfig.TASK_CLEANUP_NEEDED, true);
+    if (tracker != null) { // Some mock tests have null tracker
+      this.jobHistory = tracker.getJobHistory();
+    }
+    this.tokenStorage = null;
+  }
+  
+  JobInProgress(JobConf conf) {
+    restartCount = 0;
+    jobSetupCleanupNeeded = false;
+    taskCleanupNeeded = true;
+
+    this.memoryPerMap = conf.getMemoryForMapTask();
+    this.memoryPerReduce = conf.getMemoryForReduceTask();
+
+    this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
+  }
+  
+  /**
+   * Create a JobInProgress with the given job file, plus a handle
+   * to the tracker.
+   */
+  public JobInProgress(JobTracker jobtracker, 
+                       final JobConf default_conf, int rCount,
+                       JobInfo jobInfo,
+                       Credentials ts
+                      ) throws IOException, InterruptedException {
+    try {
+      this.restartCount = rCount;
+      this.jobId = JobID.downgrade(jobInfo.getJobID());
+      String url = "http://" + jobtracker.getJobTrackerMachine() + ":"
+          + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + this.jobId;
+      this.jobtracker = jobtracker;
+      this.jobHistory = jobtracker.getJobHistory();
+      this.startTime = System.currentTimeMillis();
+
+      this.localFs = jobtracker.getLocalFileSystem();
+      this.tokenStorage = ts;
+      // use the user supplied token to add user credentials to the conf
+      jobSubmitDir = jobInfo.getJobSubmitDir();
+      user = jobInfo.getUser().toString();
+
+      UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+      if (ts != null) {
+        for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
+          ugi.addToken(token);
+        }
+      }
+
+      fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+        public FileSystem run() throws IOException {
+          return jobSubmitDir.getFileSystem(default_conf);
+        }
+      });
+      this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR + "/"
+          + this.jobId + ".xml");
+
+      jobFile = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
+      fs.copyToLocalFile(jobFile, localJobFile);
+      conf = new JobConf(localJobFile);
+      if (conf.getUser() == null) {
+        this.conf.setUser(user);
+      }
+      if (!conf.getUser().equals(user)) {
+        String desc = "The username " + conf.getUser() + " obtained from the "
+            + "conf doesn't match the username " + user + " the user "
+            + "authenticated as";
+        AuditLogger.logFailure(user, Operation.SUBMIT_JOB.name(),
+            conf.getUser(), jobId.toString(), desc);
+        throw new IOException(desc);
+      }
+
+      String userGroups[] = ugi.getGroupNames();
+      String primaryGroup = (userGroups.length > 0) ? userGroups[0] : null;
+      if (primaryGroup != null) {
+        conf.set("group.name", primaryGroup);
+      }
+
+      this.priority = conf.getJobPriority();
+      this.profile = new JobProfile(conf.getUser(), this.jobId, jobFile
+          .toString(), url, conf.getJobName(), conf.getQueueName());
+      this.status = new JobStatus(this.jobId, 0.0f, 0.0f, JobStatus.PREP,
+          profile.getUser(), profile.getJobName(), profile.getJobFile(),
+          profile.getURL().toString());
+      this.jobtracker.getInstrumentation().addPrepJob(conf, this.jobId);
+      status.setStartTime(startTime);
+      this.status.setJobPriority(this.priority);
+
+      this.numMapTasks = conf.getNumMapTasks();
+      this.numReduceTasks = conf.getNumReduceTasks();
+
+      this.memoryPerMap = conf.getMemoryForMapTask();
+      this.memoryPerReduce = conf.getMemoryForReduceTask();
+
+      this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
+          numMapTasks + numReduceTasks + 10);
+      JobContext jobContext = new JobContextImpl(conf, jobId);
+      this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
+      this.taskCleanupNeeded = jobContext.getTaskCleanupNeeded();
+
+      // Construct the jobACLs
+      status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
+
+      this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
+      this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
+
+      this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
+
+      hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+      hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+      this.maxLevel = jobtracker.getNumTaskCacheLevels();
+      this.anyCacheLevel = this.maxLevel + 1;
+      this.nonLocalMaps = new LinkedList<TaskInProgress>();
+      this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+      this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+      this.nonRunningReduces = new LinkedList<TaskInProgress>();
+      this.runningReduces = new LinkedHashSet<TaskInProgress>();
+      this.resourceEstimator = new ResourceEstimator(this);
+      this.submitHostName = conf.getJobSubmitHostName();
+      this.submitHostAddress = conf.getJobSubmitHostAddress();
+
+      this.slowTaskThreshold = Math.max(0.0f, conf.getFloat(
+          MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD, 1.0f));
+      this.speculativeCap = conf.getFloat(MRJobConfig.SPECULATIVECAP, 0.1f);
+      this.slowNodeThreshold = conf.getFloat(
+          MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD, 1.0f);
+      // register job's tokens for renewal
+      DelegationTokenRenewal.registerDelegationTokensForRenewal(jobInfo
+          .getJobID(), ts, jobtracker.getConf());
+    } finally {
+      // close all FileSystems that was created above for the current user
+      // At this point, this constructor is called in the context of an RPC, and
+      // hence the "current user" is actually referring to the kerberos
+      // authenticated user (if security is ON).
+      FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
+    }
+  }
+    
+  private void printCache (Map<Node, List<TaskInProgress>> cache) {
+    LOG.info("The taskcache info:");
+    for (Map.Entry<Node, List<TaskInProgress>> n : cache.entrySet()) {
+      List <TaskInProgress> tips = n.getValue();
+      LOG.info("Cached TIPs on node: " + n.getKey());
+      for (TaskInProgress tip : tips) {
+        LOG.info("tip : " + tip.getTIPId());
+      }
+    }
+  }
+  
+  Map<Node, List<TaskInProgress>> createCache(
+                         TaskSplitMetaInfo[] splits, int maxLevel) {
+    Map<Node, List<TaskInProgress>> cache = 
+      new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
+    
+    for (int i = 0; i < splits.length; i++) {
+      String[] splitLocations = splits[i].getLocations();
+      if (splitLocations.length == 0) {
+        nonLocalMaps.add(maps[i]);
+        continue;
+      }
+
+      for(String host: splitLocations) {
+        Node node = jobtracker.resolveAndAddToTopology(host);
+        LOG.info("tip:" + maps[i].getTIPId() + " has split on node:" + node);
+        for (int j = 0; j < maxLevel; j++) {
+          List<TaskInProgress> hostMaps = cache.get(node);
+          if (hostMaps == null) {
+            hostMaps = new ArrayList<TaskInProgress>();
+            cache.put(node, hostMaps);
+            hostMaps.add(maps[i]);
+          }
+          //check whether the hostMaps already contains an entry for a TIP
+          //This will be true for nodes that are racks and multiple nodes in
+          //the rack contain the input for a tip. Note that if it already
+          //exists in the hostMaps, it must be the last element there since
+          //we process one TIP at a time sequentially in the split-size order
+          if (hostMaps.get(hostMaps.size() - 1) != maps[i]) {
+            hostMaps.add(maps[i]);
+          }
+          node = node.getParent();
+        }
+      }
+    }
+    return cache;
+  }
+  
+  /**
+   * Check if the job has been initialized.
+   * @return <code>true</code> if the job has been initialized, 
+   *         <code>false</code> otherwise
+   */
+  public boolean inited() {
+    return tasksInited.get();
+  }
+  
+  /**
+   * Get the user for the job
+   */
+  public String getUser() {
+    return user;
+  }
+
+  boolean getMapSpeculativeExecution() {
+    return hasSpeculativeMaps;
+  }
+  
+  boolean getReduceSpeculativeExecution() {
+    return hasSpeculativeReduces;
+  }
+  
+  long getMemoryForMapTask() {
+    return memoryPerMap;
+  }
+  
+  long getMemoryForReduceTask() {
+    return memoryPerReduce;
+  }
+  
+  /**
+   * Get the number of slots required to run a single map task-attempt.
+   * @return the number of slots required to run a single map task-attempt
+   */
+  int getNumSlotsPerMap() {
+    return numSlotsPerMap;
+  }
+
+  /**
+   * Set the number of slots required to run a single map task-attempt.
+   * This is typically set by schedulers which support high-ram jobs.
+   * @param slots the number of slots required to run a single map task-attempt
+   */
+  void setNumSlotsPerMap(int numSlotsPerMap) {
+    this.numSlotsPerMap = numSlotsPerMap;
+  }
+
+  /**
+   * Get the number of slots required to run a single reduce task-attempt.
+   * @return the number of slots required to run a single reduce task-attempt
+   */
+  int getNumSlotsPerReduce() {
+    return numSlotsPerReduce;
+  }
+
+  /**
+   * Set the number of slots required to run a single reduce task-attempt.
+   * This is typically set by schedulers which support high-ram jobs.
+   * @param slots the number of slots required to run a single reduce 
+   *              task-attempt
+   */
+  void setNumSlotsPerReduce(int numSlotsPerReduce) {
+    this.numSlotsPerReduce = numSlotsPerReduce;
+  }
+
+  /**
+   * Construct the splits, etc.  This is invoked from an async
+   * thread so that split-computation doesn't block anyone. Only the 
+   * {@link JobTracker} should invoke this api. Look 
+   * at {@link JobTracker#initJob(JobInProgress)} for more details.
+   */
+  public synchronized void initTasks() 
+  throws IOException, KillInterruptedException, UnknownHostException {
+    if (tasksInited.get() || isComplete()) {
+      return;
+    }
+    synchronized(jobInitKillStatus){
+      if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {
+        return;
+      }
+      jobInitKillStatus.initStarted = true;
+    }
+
+    LOG.info("Initializing " + jobId);
+
+    logSubmissionToJobHistory();
+    
+    // log the job priority
+    setPriority(this.priority);
+    
+    //
+    // generate security keys needed by Tasks
+    //
+    generateAndStoreTokens();
+    
+    //
+    // read input splits and create a map per a split
+    //
+    TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(jobId);
+    numMapTasks = taskSplitMetaInfo.length;
+
+    checkTaskLimits();
+
+    // Sanity check the locations so we don't create/initialize unnecessary tasks
+    for (TaskSplitMetaInfo split : taskSplitMetaInfo) {
+      NetUtils.verifyHostnames(split.getLocations());
+    }
+
+    jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
+    jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
+
+    createMapTasks(jobFile.toString(), taskSplitMetaInfo);
+    
+    if (numMapTasks > 0) { 
+      nonRunningMapCache = createCache(taskSplitMetaInfo,
+          maxLevel);
+    }
+        
+    // set the launch time
+    this.launchTime = JobTracker.getClock().getTime();
+
+    createReduceTasks(jobFile.toString());
+    
+    // Calculate the minimum number of maps to be complete before 
+    // we should start scheduling reduces
+    completedMapsForReduceSlowstart = 
+      (int)Math.ceil(
+          (conf.getFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 
+                         DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * 
+           numMapTasks));
+    
+    initSetupCleanupTasks(jobFile.toString());
+    
+    synchronized(jobInitKillStatus){
+      jobInitKillStatus.initDone = true;
+      if(jobInitKillStatus.killed) {
+        //setup not launched so directly terminate
+        throw new KillInterruptedException("Job " + jobId + " killed in init");
+      }
+    }
+    
+    tasksInited.set(true);
+    JobInitedEvent jie = new JobInitedEvent(
+        profile.getJobID(),  this.launchTime,
+        numMapTasks, numReduceTasks,
+        JobStatus.getJobRunState(JobStatus.PREP));
+    
+    jobHistory.logEvent(jie, jobId);
+   
+    // Log the number of map and reduce tasks
+    LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks 
+             + " map tasks and " + numReduceTasks + " reduce tasks.");
+  }
+
+  // Returns true if the job is empty (0 maps, 0 reduces and no setup-cleanup)
+  // else return false.
+  synchronized boolean isJobEmpty() {
+    return maps.length == 0 && reduces.length == 0 && !jobSetupCleanupNeeded;
+  }
+  
+  synchronized boolean isSetupCleanupRequired() {
+   return jobSetupCleanupNeeded;
+  }
+
+  // Should be called once the init is done. This will complete the job 
+  // because the job is empty (0 maps, 0 reduces and no setup-cleanup).
+  synchronized void completeEmptyJob() {
+    jobComplete();
+  }
+
+  synchronized void completeSetup() {
+    setupComplete();
+  }
+
+  void logSubmissionToJobHistory() throws IOException {
+    // log job info
+    String username = conf.getUser();
+    if (username == null) { username = ""; }
+    String jobname = conf.getJobName();
+    String jobQueueName = conf.getQueueName();
+
+    setUpLocalizedJobConf(conf, jobId);
+    jobHistory.setupEventWriter(jobId, conf);
+    JobSubmittedEvent jse =
+        new JobSubmittedEvent(jobId, jobname, username, this.startTime,
+            jobFile.toString(), status.getJobACLs(), jobQueueName);
+    jobHistory.logEvent(jse, jobId);
+    
+  }
+
+  TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId) 
+  throws IOException {
+    TaskSplitMetaInfo[] allTaskSplitMetaInfo = 
+      SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf, jobSubmitDir);
+    return allTaskSplitMetaInfo;
+  }
+
+  /**
+   * If the number of taks is greater than the configured value
+   * throw an exception that will fail job initialization
+   */
+  void checkTaskLimits() throws IOException {
+    int maxTasks = jobtracker.getMaxTasksPerJob();
+    if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) {
+      throw new IOException(
+                "The number of tasks for this job " + 
+                (numMapTasks + numReduceTasks) +
+                " exceeds the configured limit " + maxTasks);
+    }
+  }
+
+  synchronized void createMapTasks(String jobFile, 
+		  TaskSplitMetaInfo[] splits) {
+    maps = new TaskInProgress[numMapTasks];
+    for(int i=0; i < numMapTasks; ++i) {
+      inputLength += splits[i].getInputDataLength();
+      maps[i] = new TaskInProgress(jobId, jobFile, 
+                                   splits[i], 
+                                   jobtracker, conf, this, 
+                                   i, numSlotsPerMap);
+    }
+    LOG.info("Input size for job " + jobId + " = " + inputLength
+        + ". Number of splits = " + splits.length);
+
+  }
+
+  synchronized void createReduceTasks(String jobFile) {
+    this.reduces = new TaskInProgress[numReduceTasks];
+    for (int i = 0; i < numReduceTasks; i++) {
+      reduces[i] = new TaskInProgress(jobId, jobFile, 
+                                      numMapTasks, i, 
+                                      jobtracker, conf, 
+                                      this, numSlotsPerReduce);
+      nonRunningReduces.add(reduces[i]);
+    }
+  }
+
+  
+  synchronized void initSetupCleanupTasks(String jobFile) {
+    if (!jobSetupCleanupNeeded) {
+      LOG.info("Setup/Cleanup not needed for job " + jobId);
+      // nothing to initialize
+      return;
+    }
+    // create cleanup two cleanup tips, one map and one reduce.
+    cleanup = new TaskInProgress[2];
+
+    // cleanup map tip. This map doesn't use any splits. Just assign an empty
+    // split.
+    TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
+    cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
+            jobtracker, conf, this, numMapTasks, 1);
+    cleanup[0].setJobCleanupTask();
+
+    // cleanup reduce tip.
+    cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
+                       numReduceTasks, jobtracker, conf, this, 1);
+    cleanup[1].setJobCleanupTask();
+
+    // create two setup tips, one map and one reduce.
+    setup = new TaskInProgress[2];
+
+    // setup map tip. This map doesn't use any split. Just assign an empty
+    // split.
+    setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
+            jobtracker, conf, this, numMapTasks + 1, 1);
+    setup[0].setJobSetupTask();
+
+    // setup reduce tip.
+    setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
+                       numReduceTasks + 1, jobtracker, conf, this, 1);
+    setup[1].setJobSetupTask();
+  }
+  
+  void setupComplete() {
+    status.setSetupProgress(1.0f);
+    if (this.status.getRunState() == JobStatus.PREP) {
+      changeStateTo(JobStatus.RUNNING);
+      JobStatusChangedEvent jse = 
+        new JobStatusChangedEvent(profile.getJobID(),
+         JobStatus.getJobRunState(JobStatus.RUNNING));
+      jobHistory.logEvent(jse, profile.getJobID());
+    }
+  }
+
+  /////////////////////////////////////////////////////
+  // Accessors for the JobInProgress
+  /////////////////////////////////////////////////////
+  public JobProfile getProfile() {
+    return profile;
+  }
+  public JobStatus getStatus() {
+    return status;
+  }
+  public synchronized long getLaunchTime() {
+    return launchTime;
+  }
+  Map<TaskType, Long> getFirstTaskLaunchTimes() {
+    return firstTaskLaunchTimes;
+  }
+  public long getStartTime() {
+    return startTime;
+  }
+  public long getFinishTime() {
+    return finishTime;
+  }
+  public int desiredMaps() {
+    return numMapTasks;
+  }
+  public synchronized int finishedMaps() {
+    return finishedMapTasks;
+  }
+  public int desiredReduces() {
+    return numReduceTasks;
+  }
+  public synchronized int runningMaps() {
+    return runningMapTasks;
+  }
+  public synchronized int runningReduces() {
+    return runningReduceTasks;
+  }
+  public synchronized int finishedReduces() {
+    return finishedReduceTasks;
+  }
+  public synchronized int pendingMaps() {
+    return numMapTasks - runningMapTasks - failedMapTIPs - 
+    finishedMapTasks + speculativeMapTasks;
+  }
+  public synchronized int pendingReduces() {
+    return numReduceTasks - runningReduceTasks - failedReduceTIPs - 
+    finishedReduceTasks + speculativeReduceTasks;
+  }
+ 
+  public int getNumSlotsPerTask(TaskType taskType) {
+    if (taskType == TaskType.MAP) {
+      return numSlotsPerMap;
+    } else if (taskType == TaskType.REDUCE) {
+      return numSlotsPerReduce;
+    } else {
+      return 1;
+    }
+  }
+  public JobPriority getPriority() {
+    return this.priority;
+  }
+  public void setPriority(JobPriority priority) {
+    if(priority == null) {
+      priority = JobPriority.NORMAL;
+    }
+    synchronized (this) {
+      this.priority = priority;
+      status.setJobPriority(priority);
+      // log and change to the job's priority
+      JobPriorityChangeEvent prEvent = 
+        new JobPriorityChangeEvent(jobId, priority);
+       
+      jobHistory.logEvent(prEvent, jobId);
+      
+    }
+  }
+
+  // Update the job start/launch time (upon restart) and log to history
+  synchronized void updateJobInfo(long startTime, long launchTime) {
+    // log and change to the job's start/launch time
+    this.startTime = startTime;
+    this.launchTime = launchTime;
+    JobInfoChangeEvent event = 
+      new JobInfoChangeEvent(jobId, startTime, launchTime);
+     
+    jobHistory.logEvent(event, jobId);
+    
+  }
+
+  /**
+   * Get the number of times the job has restarted
+   */
+  int getNumRestarts() {
+    return restartCount;
+  }
+  
+  long getInputLength() {
+    return inputLength;
+  }
+ 
+  boolean isCleanupLaunched() {
+    return launchedCleanup;
+  }
+
+  boolean isSetupLaunched() {
+    return launchedSetup;
+  }
+
+  /** 
+   * Get all the tasks of the desired type in this job.
+   * @param type {@link TaskType} of the tasks required
+   * @return An array of {@link TaskInProgress} matching the given type. 
+   *         Returns an empty array if no tasks are found for the given type.  
+   */
+  TaskInProgress[] getTasks(TaskType type) {
+    TaskInProgress[] tasks = null;
+    switch (type) {
+      case MAP:
+      {
+        tasks = maps;
+      }
+      break;
+      case REDUCE:
+      {
+        tasks = reduces;
+      }
+      break;
+      case JOB_SETUP: 
+      {
+        tasks = setup;
+      }
+      break;
+      case JOB_CLEANUP:
+      {
+        tasks = cleanup;
+      }
+      break;
+      default:
+      {
+          tasks = new TaskInProgress[0];
+      }
+      break;
+    }
+    return tasks;
+  }
+
+  /**
+   * Return the nonLocalRunningMaps
+   * @return
+   */
+  Set<TaskInProgress> getNonLocalRunningMaps()
+  {
+    return nonLocalRunningMaps;
+  }
+  
+  /**
+   * Return the runningMapCache
+   * @return
+   */
+  Map<Node, Set<TaskInProgress>> getRunningMapCache()
+  {
+    return runningMapCache;
+  }
+  
+  /**
+   * Return runningReduces
+   * @return
+   */
+  Set<TaskInProgress> getRunningReduces()
+  {
+    return runningReduces;
+  }
+  
+  /**
+   * Get the job configuration
+   * @return the job's configuration
+   */
+  JobConf getJobConf() {
+    return conf;
+  }
+    
+  /**
+   * Return a vector of completed TaskInProgress objects
+   */
+  public synchronized Vector<TaskInProgress> reportTasksInProgress(boolean shouldBeMap,
+                                                      boolean shouldBeComplete) {
+    
+    Vector<TaskInProgress> results = new Vector<TaskInProgress>();
+    TaskInProgress tips[] = null;
+    if (shouldBeMap) {
+      tips = maps;
+    } else {
+      tips = reduces;
+    }
+    for (int i = 0; i < tips.length; i++) {
+      if (tips[i].isComplete() == shouldBeComplete) {
+        results.add(tips[i]);
+      }
+    }
+    return results;
+  }
+  
+  /**
+   * Return a vector of cleanup TaskInProgress objects
+   */
+  public synchronized Vector<TaskInProgress> reportCleanupTIPs(
+                                               boolean shouldBeComplete) {
+    
+    Vector<TaskInProgress> results = new Vector<TaskInProgress>();
+    for (int i = 0; i < cleanup.length; i++) {
+      if (cleanup[i].isComplete() == shouldBeComplete) {
+        results.add(cleanup[i]);
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Return a vector of setup TaskInProgress objects
+   */
+  public synchronized Vector<TaskInProgress> reportSetupTIPs(
+                                               boolean shouldBeComplete) {
+    
+    Vector<TaskInProgress> results = new Vector<TaskInProgress>();
+    for (int i = 0; i < setup.length; i++) {
+      if (setup[i].isComplete() == shouldBeComplete) {
+        results.add(setup[i]);
+      }
+    }
+    return results;
+  }
+
+  ////////////////////////////////////////////////////
+  // Status update methods
+  ////////////////////////////////////////////////////
+
+  /**
+   * Assuming {@link JobTracker} is locked on entry.
+   */
+  public synchronized void updateTaskStatus(TaskInProgress tip, 
+                                            TaskStatus status) {
+
+    double oldProgress = tip.getProgress();   // save old progress
+    boolean wasRunning = tip.isRunning();
+    boolean wasComplete = tip.isComplete();
+    boolean wasPending = tip.isOnlyCommitPending();
+    TaskAttemptID taskid = status.getTaskID();
+    boolean wasAttemptRunning = tip.isAttemptRunning(taskid);
+
+    
+    // If the TIP is already completed and the task reports as SUCCEEDED then 
+    // mark the task as KILLED.
+    // In case of task with no promotion the task tracker will mark the task 
+    // as SUCCEEDED.
+    // User has requested to kill the task, but TT reported SUCCEEDED, 
+    // mark the task KILLED.
+    if ((wasComplete || tip.wasKilled(taskid)) && 
+        (status.getRunState() == TaskStatus.State.SUCCEEDED)) {
+      status.setRunState(TaskStatus.State.KILLED);
+    }
+    
+    // If the job is complete or task-cleanup is switched off
+    // and a task has just reported its state as FAILED_UNCLEAN/KILLED_UNCLEAN, 
+    // make the task's state FAILED/KILLED without launching cleanup attempt.
+    // Note that if task is already a cleanup attempt, 
+    // we don't change the state to make sure the task gets a killTaskAction
+    if ((this.isComplete() || jobFailed || jobKilled || !taskCleanupNeeded) && 
+        !tip.isCleanupAttempt(taskid)) {
+      if (status.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+        status.setRunState(TaskStatus.State.FAILED);
+      } else if (status.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
+        status.setRunState(TaskStatus.State.KILLED);
+      }
+    }
+    
+    boolean change = tip.updateStatus(status);
+    if (change) {
+      TaskStatus.State state = status.getRunState();
+      // get the TaskTrackerStatus where the task ran 
+      TaskTracker taskTracker = 
+        this.jobtracker.getTaskTracker(tip.machineWhereTaskRan(taskid));
+      TaskTrackerStatus ttStatus = 
+        (taskTracker == null) ? null : taskTracker.getStatus();
+      String taskTrackerHttpLocation = null; 
+
+      if (null != ttStatus){
+        String host;
+        if (NetUtils.getStaticResolution(ttStatus.getHost()) != null) {
+          host = NetUtils.getStaticResolution(ttStatus.getHost());
+        } else {
+          host = ttStatus.getHost();
+        }
+        taskTrackerHttpLocation = "http://" + host + ":"
+            + ttStatus.getHttpPort(); 
+      }
+
+      TaskCompletionEvent taskEvent = null;
+      if (state == TaskStatus.State.SUCCEEDED) {
+        taskEvent = new TaskCompletionEvent(
+                                            taskCompletionEventTracker, 
+                                            taskid,
+                                            tip.idWithinJob(),
+                                            status.getIsMap() &&
+                                            !tip.isJobCleanupTask() &&
+                                            !tip.isJobSetupTask(),
+                                            TaskCompletionEvent.Status.SUCCEEDED,
+                                            taskTrackerHttpLocation 
+                                           );
+        taskEvent.setTaskRunTime((int)(status.getFinishTime() 
+                                       - status.getStartTime()));
+        tip.setSuccessEventNumber(taskCompletionEventTracker); 
+      } else if (state == TaskStatus.State.COMMIT_PENDING) {
+        // If it is the first attempt reporting COMMIT_PENDING
+        // ask the task to commit.
+        if (!wasComplete && !wasPending) {
+          tip.doCommit(taskid);
+        }
+        return;
+      } else if (state == TaskStatus.State.FAILED_UNCLEAN ||
+                 state == TaskStatus.State.KILLED_UNCLEAN) {
+        tip.incompleteSubTask(taskid, this.status);
+        // add this task, to be rescheduled as cleanup attempt
+        if (tip.isMapTask()) {
+          mapCleanupTasks.add(taskid);
+        } else {
+          reduceCleanupTasks.add(taskid);
+        }
+        // Remove the task entry from jobtracker
+        jobtracker.removeTaskEntry(taskid);
+      }
+      //For a failed task update the JT datastructures. 
+      else if (state == TaskStatus.State.FAILED ||
+               state == TaskStatus.State.KILLED) {
+        // Get the event number for the (possibly) previously successful
+        // task. If there exists one, then set that status to OBSOLETE 
+        int eventNumber;
+        if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
+          TaskCompletionEvent t = 
+            this.taskCompletionEvents.get(eventNumber);
+          if (t.getTaskAttemptId().equals(taskid))
+            t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
+        }
+        
+        // Tell the job to fail the relevant task
+        failedTask(tip, taskid, status, taskTracker,
+                   wasRunning, wasComplete, wasAttemptRunning);
+
+        // Did the task failure lead to tip failure?
+        TaskCompletionEvent.Status taskCompletionStatus = 
+          (state == TaskStatus.State.FAILED ) ?
+              TaskCompletionEvent.Status.FAILED :
+              TaskCompletionEvent.Status.KILLED;
+        if (tip.isFailed()) {
+          taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED;
+        }
+        taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, 
+                                            taskid,
+                                            tip.idWithinJob(),
+                                            status.getIsMap() &&
+                                            !tip.isJobCleanupTask() &&
+                                            !tip.isJobSetupTask(),
+                                            taskCompletionStatus, 
+                                            taskTrackerHttpLocation
+                                           );
+      }          
+
+      // Add the 'complete' task i.e. successful/failed
+      // It _is_ safe to add the TaskCompletionEvent.Status.SUCCEEDED
+      // *before* calling TIP.completedTask since:
+      // a. One and only one task of a TIP is declared as a SUCCESS, the
+      //    other (speculative tasks) are marked KILLED
+      // b. TIP.completedTask *does not* throw _any_ exception at all.
+      if (taskEvent != null) {
+        this.taskCompletionEvents.add(taskEvent);
+        taskCompletionEventTracker++;
+        JobTrackerStatistics.TaskTrackerStat ttStat = jobtracker.
+           getStatistics().getTaskTrackerStat(tip.machineWhereTaskRan(taskid));
+        if(ttStat != null) { // ttStat can be null in case of lost tracker
+          ttStat.incrTotalTasks();
+        }
+        if (state == TaskStatus.State.SUCCEEDED) {
+          completedTask(tip, status);
+          if(ttStat != null) {
+            ttStat.incrSucceededTasks();
+          }
+        }
+      }
+    }
+        
+    //
+    // Update JobInProgress status
+    //
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Taking progress for " + tip.getTIPId() + " from " + 
+                 oldProgress + " to " + tip.getProgress());
+    }
+    
+    if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
+      double progressDelta = tip.getProgress() - oldProgress;
+      if (tip.isMapTask()) {
+          this.status.setMapProgress((float) (this.status.mapProgress() +
+                                              progressDelta / maps.length));
+      } else {
+        this.status.setReduceProgress((float) (this.status.reduceProgress() + 
+                                           (progressDelta / reduces.length)));
+      }
+    }
+  }
+
+  /**
+   * Returns the job-level counters.
+   * 
+   * @return the job-level counters.
+   */
+  public synchronized Counters getJobCounters() {
+    return jobCounters;
+  }
+  
+  /**
+   *  Returns map phase counters by summing over all map tasks in progress.
+   */
+  public synchronized Counters getMapCounters() {
+    return incrementTaskCounters(new Counters(), maps);
+  }
+    
+  /**
+   *  Returns map phase counters by summing over all map tasks in progress.
+   */
+  public synchronized Counters getReduceCounters() {
+    return incrementTaskCounters(new Counters(), reduces);
+  }
+    
+  /**
+   *  Returns the total job counters, by adding together the job, 
+   *  the map and the reduce counters.
+   */
+  public Counters getCounters() {
+    Counters result = new Counters();
+    synchronized (this) {
+      result.incrAllCounters(getJobCounters());
+    }
+
+    // the counters of TIPs are not updated in place.
+    // hence read-only access is ok without any locks
+    incrementTaskCounters(result, maps);
+    return incrementTaskCounters(result, reduces);
+  }
+    
+  /**
+   * Increments the counters with the counters from each task.
+   * @param counters the counters to increment
+   * @param tips the tasks to add in to counters
+   * @return counters the same object passed in as counters
+   */
+  private Counters incrementTaskCounters(Counters counters,
+                                         TaskInProgress[] tips) {
+    for (TaskInProgress tip : tips) {
+      counters.incrAllCounters(tip.getCounters());
+    }
+    return counters;
+  }
+
+  /////////////////////////////////////////////////////
+  // Create/manage tasks
+  /////////////////////////////////////////////////////
+  /**
+   * Return a MapTask, if appropriate, to run on the given tasktracker
+   */
+  public synchronized Task obtainNewMapTask(TaskTrackerStatus tts, 
+                                            int clusterSize, 
+                                            int numUniqueHosts,
+                                            int maxCacheLevel
+                                           ) throws IOException {
+    if (status.getRunState() != JobStatus.RUNNING) {
+      LOG.info("Cannot create task split for " + profile.getJobID());
+      return null;
+    }
+       
+    int target = findNewMapTask(tts, clusterSize, numUniqueHosts,
+        maxCacheLevel);
+    if (target == -1) {
+      return null;
+    }
+    
+    Task result = maps[target].getTaskToRun(tts.getTrackerName());
+    if (result != null) {
+      addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
+    }
+
+    return result;
+  } 
+  
+  /**
+   * Return a MapTask, if appropriate, to run on the given tasktracker
+   */
+  public synchronized Task obtainNewMapTask(TaskTrackerStatus tts, 
+                                            int clusterSize, 
+                                            int numUniqueHosts
+                                           ) throws IOException {
+    return obtainNewMapTask(tts, clusterSize, numUniqueHosts, anyCacheLevel);
+  }    
+
+  /*
+   * Return task cleanup attempt if any, to run on a given tracker
+   */
+  public Task obtainTaskCleanupTask(TaskTrackerStatus tts, 
+                                                 boolean isMapSlot)
+  throws IOException {
+    if (!tasksInited.get()) {
+      return null;
+    }
+    synchronized (this) {
+      if (this.status.getRunState() != JobStatus.RUNNING || 
+          jobFailed || jobKilled) {
+        return null;
+      }
+      String taskTracker = tts.getTrackerName();
+      if (!shouldRunOnTaskTracker(taskTracker)) {
+        return null;
+      }
+      TaskAttemptID taskid = null;
+      TaskInProgress tip = null;
+      if (isMapSlot) {
+        if (!mapCleanupTasks.isEmpty()) {
+          taskid = mapCleanupTasks.remove(0);
+          tip = maps[taskid.getTaskID().getId()];
+        }
+      } else {
+        if (!reduceCleanupTasks.isEmpty()) {
+          taskid = reduceCleanupTasks.remove(0);
+          tip = reduces[taskid.getTaskID().getId()];
+        }
+      }
+      if (tip != null) {
+        return tip.addRunningTask(taskid, taskTracker, true);
+      }
+      return null;
+    }
+  }
+  
+  public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,
+                                                     int clusterSize, 
+                                                     int numUniqueHosts)
+  throws IOException {
+    if (!tasksInited.get()) {
+      LOG.info("Cannot create task split for " + profile.getJobID());
+      return null;
+    }
+  
+    return obtainNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel);
+  }
+  
+  public synchronized Task obtainNewNonLocalMapTask(TaskTrackerStatus tts,
+                                                    int clusterSize, 
+                                                    int numUniqueHosts)
+  throws IOException {
+    if (!tasksInited.get()) {
+      LOG.info("Cannot create task split for " + profile.getJobID());
+      return null;
+    }
+  
+    return obtainNewMapTask(tts, clusterSize, numUniqueHosts,
+        NON_LOCAL_CACHE_LEVEL);
+  }
+  
+  /**
+   * Return a CleanupTask, if appropriate, to run on the given tasktracker
+   * 
+   */
+  public Task obtainJobCleanupTask(TaskTrackerStatus tts, 
+                                             int clusterSize, 
+                                             int numUniqueHosts,
+                                             boolean isMapSlot
+                                            ) throws IOException {
+    if(!tasksInited.get() || !jobSetupCleanupNeeded) {
+      return null;
+    }
+    
+    synchronized(this) {
+      if (!canLaunchJobCleanupTask()) {
+        return null;
+      }
+      
+      String taskTracker = tts.getTrackerName();
+      // Update the last-known clusterSize
+      this.clusterSize = clusterSize;
+      if (!shouldRunOnTaskTracker(taskTracker)) {
+        return null;
+      }
+      
+      List<TaskInProgress> cleanupTaskList = new ArrayList<TaskInProgress>();
+      if (isMapSlot) {
+        cleanupTaskList.add(cleanup[0]);
+      } else {
+        cleanupTaskList.add(cleanup[1]);
+      }
+      TaskInProgress tip = findTaskFromList(cleanupTaskList,
+                             tts, numUniqueHosts, false);
+      if (tip == null) {
+        return null;
+      }
+      
+      // Now launch the cleanupTask
+      Task result = tip.getTaskToRun(tts.getTrackerName());
+      if (result != null) {
+        addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+        if (jobFailed) {
+          result.setJobCleanupTaskState(org.apache.hadoop.mapreduce.JobStatus
+                .State.FAILED);
+        } else if (jobKilled) {
+          result.setJobCleanupTaskState(org.apache.hadoop.mapreduce.JobStatus
+                .State.KILLED);
+        } else {
+          result.setJobCleanupTaskState(org.apache.hadoop.mapreduce
+                .JobStatus.State.SUCCEEDED);
+        }
+      }
+      return result;
+    }
+    
+  }
+  
+  /**
+   * Check whether cleanup task can be launched for the job.
+   * 
+   * Cleanup task can be launched if it is not already launched
+   * or job is Killed
+   * or all maps and reduces are complete
+   * @return true/false
+   */
+  private synchronized boolean canLaunchJobCleanupTask() {
+    // check if the job is running
+    if (status.getRunState() != JobStatus.RUNNING &&
+        status.getRunState() != JobStatus.PREP) {
+      return false;
+    }
+    // check if cleanup task has been launched already or if setup isn't
+    // launched already. The later check is useful when number of maps is
+    // zero.
+    if (launchedCleanup || !isSetupFinished()) {
+      return false;
+    }
+    // check if job has failed or killed
+    if (jobKilled || jobFailed) {
+      return true;
+    }
+    // Check if all maps and reducers have finished.
+    boolean launchCleanupTask = 
+        ((finishedMapTasks + failedMapTIPs) == (numMapTasks));
+    if (launchCleanupTask) {
+      launchCleanupTask = 
+        ((finishedReduceTasks + failedReduceTIPs) == numReduceTasks);
+    }
+    return launchCleanupTask;
+  }
+
+  /**
+   * Return a SetupTask, if appropriate, to run on the given tasktracker
+   * 
+   */
+  public Task obtainJobSetupTask(TaskTrackerStatus tts, 
+                                             int clusterSize, 
+                                             int numUniqueHosts,
+                                             boolean isMapSlot
+                                            ) throws IOException {
+    if(!tasksInited.get() || !jobSetupCleanupNeeded) {
+      return null;
+    }
+    
+    synchronized(this) {
+      if (!canLaunchSetupTask()) {
+        return null;
+      }
+      String taskTracker = tts.getTrackerName();
+      // Update the last-known clusterSize
+      this.clusterSize = clusterSize;
+      if (!shouldRunOnTaskTracker(taskTracker)) {
+        return null;
+      }
+      
+      List<TaskInProgress> setupTaskList = new ArrayList<TaskInProgress>();
+      if (isMapSlot) {
+        setupTaskList.add(setup[0]);
+      } else {
+        setupTaskList.add(setup[1]);
+      }
+      TaskInProgress tip = findTaskFromList(setupTaskList,
+                             tts, numUniqueHosts, false);
+      if (tip == null) {
+        return null;
+      }
+      
+      // Now launch the setupTask
+      Task result = tip.getTaskToRun(tts.getTrackerName());
+      if (result != null) {
+        addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+      }
+      return result;
+    }
+  }
+  
+  public synchronized boolean scheduleReduces() {
+    return finishedMapTasks >= completedMapsForReduceSlowstart;
+  }
+  
+  /**
+   * Check whether setup task can be launched for the job.
+   * 
+   * Setup task can be launched after the tasks are inited
+   * and Job is in PREP state
+   * and if it is not already launched
+   * or job is not Killed/Failed
+   * @return true/false
+   */
+  private synchronized boolean canLaunchSetupTask() {
+    return (tasksInited.get() && status.getRunState() == JobStatus.PREP && 
+           !launchedSetup && !jobKilled && !jobFailed);
+  }
+  
+
+  /**
+   * Return a ReduceTask, if appropriate, to run on the given tasktracker.
+   * We don't have cache-sensitivity for reduce tasks, as they
+   *  work on temporary MapRed files.  
+   */
+  public synchronized Task obtainNewReduceTask(TaskTrackerStatus tts,
+                                               int clusterSize,
+                                               int numUniqueHosts
+                                              ) throws IOException {
+    if (status.getRunState() != JobStatus.RUNNING) {
+      LOG.info("Cannot create task split for " + profile.getJobID());
+      return null;
+    }
+    
+    // Ensure we have sufficient map outputs ready to shuffle before 
+    // scheduling reduces
+    if (!scheduleReduces()) {
+      return null;
+    }
+
+    int  target = findNewReduceTask(tts, clusterSize, numUniqueHosts);
+    if (target == -1) {
+      return null;
+    }
+    
+    Task result = reduces[target].getTaskToRun(tts.getTrackerName());
+    if (result != null) {
+      addRunningTaskToTIP(reduces[target], result.getTaskID(), tts, true);
+    }
+
+    return result;
+  }
+  
+  // returns the (cache)level at which the nodes matches
+  private int getMatchingLevelForNodes(Node n1, Node n2) {
+    int count = 0;
+    do {
+      if (n1.equals(n2)) {
+        return count;
+      }
+      ++count;
+      n1 = n1.getParent();
+      n2 = n2.getParent();
+    } while (n1 != null);
+    return this.maxLevel;
+  }
+
+  /**
+   * Populate the data structures as a task is scheduled.
+   * 
+   * Assuming {@link JobTracker} is locked on entry.
+   * 
+   * @param tip The tip for which the task is added
+   * @param id The attempt-id for the task
+   * @param tts task-tracker status
+   * @param isScheduled Whether this task is scheduled from the JT or has 
+   *        joined back upon restart
+   */
+  synchronized void addRunningTaskToTIP(TaskInProgress tip, TaskAttemptID id, 
+                                        TaskTrackerStatus tts, 
+                                        boolean isScheduled) {
+    // Make an entry in the tip if the attempt is not scheduled i.e externally
+    // added
+    if (!isScheduled) {
+      tip.addRunningTask(id, tts.getTrackerName());
+    }
+    final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
+
+    // keeping the earlier ordering intact
+    TaskType name;
+    String splits = "";
+    Enum counter = null;
+    if (tip.isJobSetupTask()) {
+      launchedSetup = true;
+      name = TaskType.JOB_SETUP;
+    } else if (tip.isJobCleanupTask()) {
+      launchedCleanup = true;
+      name = TaskType.JOB_CLEANUP;
+    } else if (tip.isMapTask()) {
+      ++runningMapTasks;
+      name = TaskType.MAP;
+      counter = JobCounter.TOTAL_LAUNCHED_MAPS;
+      splits = tip.getSplitNodes();
+      if (tip.isSpeculating()) {
+        speculativeMapTasks++;
+        metrics.speculateMap(id);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Chosen speculative task, current speculativeMap task count: "
+                    + speculativeMapTasks);
+        }
+      }
+      metrics.launchMap(id);
+    } else {
+      ++runningReduceTasks;
+      name = TaskType.REDUCE;
+      counter = JobCounter.TOTAL_LAUNCHED_REDUCES;
+      if (tip.isSpeculating()) {
+        speculativeReduceTasks++;
+        metrics.speculateReduce(id);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Chosen speculative task, current speculativeReduce task count: "
+                    + speculativeReduceTasks);
+        }
+      }
+      metrics.launchReduce(id);
+    }
+    // Note that the logs are for the scheduled tasks only. Tasks that join on 
+    // restart has already their logs in place.
+    if (tip.isFirstAttempt(id)) {
+      TaskStartedEvent tse = new TaskStartedEvent(tip.getTIPId(), 
+          tip.getExecStartTime(),
+          name, splits);
+      
+      jobHistory.logEvent(tse, tip.getJob().jobId);
+      setFirstTaskLaunchTime(tip);
+    }
+    if (!tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
+      jobCounters.incrCounter(counter, 1);
+    }
+    
+    //TODO The only problem with these counters would be on restart.
+    // The jobtracker updates the counter only when the task that is scheduled
+    // if from a non-running tip and is local (data, rack ...). But upon restart
+    // as the reports come from the task tracker, there is no good way to infer
+    // when exactly to increment the locality counters. The only solution is to 
+    // increment the counters for all the tasks irrespective of 
+    //    - whether the tip is running or not
+    //    - whether its a speculative task or not
+    //
+    // So to simplify, increment the data locality counter whenever there is 
+    // data locality.
+    if (tip.isMapTask() && !tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
+      // increment the data locality counter for maps
+      int level = getLocalityLevel(tip, tts);
+      switch (level) {
+      case 0 :
+        LOG.info("Choosing data-local task " + tip.getTIPId());
+        jobCounters.incrCounter(JobCounter.DATA_LOCAL_MAPS, 1);
+        metrics.launchDataLocalMap(id);
+        break;
+      case 1:
+        LOG.info("Choosing rack-local task " + tip.getTIPId());
+        jobCounters.incrCounter(JobCounter.RACK_LOCAL_MAPS, 1);
+        metrics.launchRackLocalMap(id);
+        break;
+      default :
+        // check if there is any locality
+        if (level != this.maxLevel) {
+          LOG.info("Choosing cached task at level " + level + tip.getTIPId());
+          jobCounters.incrCounter(JobCounter.OTHER_LOCAL_MAPS, 1);
+        }
+        break;
+      }
+    }
+  }
+
+  void setFirstTaskLaunchTime(TaskInProgress tip) {
+    TaskType key = getTaskType(tip);
+
+    synchronized(firstTaskLaunchTimes) {
+      // Could be optimized to do only one lookup with a little more code
+      if (!firstTaskLaunchTimes.containsKey(key)) {
+        firstTaskLaunchTimes.put(key, tip.getExecStartTime());
+      }
+    }
+  }
+    
+  public static String convertTrackerNameToHostName(String trackerName) {
+    // Ugly!
+    // Convert the trackerName to it's host name
+    int indexOfColon = trackerName.indexOf(":");
+    String trackerHostName = (indexOfColon == -1) ? 
+      trackerName : 
+      trackerName.substring(0, indexOfColon);
+    return trackerHostName.substring("tracker_".length());
+  }
+    
+  /**
+   * Note that a task has failed on a given tracker and add the tracker  
+   * to the blacklist iff too many trackers in the cluster i.e. 
+   * (clusterSize * CLUSTER_BLACKLIST_PERCENT) haven't turned 'flaky' already.
+   * 
+   * @param taskTracker task-tracker on which a task failed
+   */
+  synchronized void addTrackerTaskFailure(String trackerName, 
+                                          TaskTracker taskTracker) {
+    if (flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) { 
+      String trackerHostName = convertTrackerNameToHostName(trackerName);
+
+      Integer trackerFailures = trackerToFailuresMap.get(trackerHostName);
+      if (trackerFailures == null) {
+        trackerFailures = 0;
+      }
+      trackerToFailuresMap.put(trackerHostName, ++trackerFailures);
+
+      // Check if this tasktracker has turned 'flaky'
+      if (trackerFailures.intValue() == maxTaskFailuresPerTracker) {
+        ++flakyTaskTrackers;
+        
+        // Cancel reservations if appropriate
+        if (taskTracker != null) {
+          if (trackersReservedForMaps.containsKey(taskTracker)) {
+            taskTracker.unreserveSlots(TaskType.MAP, this);
+          }
+          if (trackersReservedForReduces.containsKey(taskTracker)) {
+            taskTracker.unreserveSlots(TaskType.REDUCE, this);
+          }
+        }
+        LOG.info("TaskTracker at '" + trackerHostName + "' turned 'flaky'");
+      }
+    }
+  }
+  
+  public synchronized void reserveTaskTracker(TaskTracker taskTracker,
+                                              TaskType type, int numSlots) {
+    Map<TaskTracker, FallowSlotInfo> map =
+      (type == TaskType.MAP) ? trackersReservedForMaps : trackersReservedForReduces;
+    
+    long now = System.currentTimeMillis();
+    
+    FallowSlotInfo info = map.get(taskTracker);
+    int reservedSlots = 0;
+    if (info == null) {
+      info = new FallowSlotInfo(now, numSlots);
+      reservedSlots = numSlots;
+    } else {
+      // Increment metering info if the reservation is changing
+      if (info.getNumSlots() != numSlots) {
+        Enum<JobCounter> counter = 
+          (type == TaskType.MAP) ? 
+              JobCounter.FALLOW_SLOTS_MILLIS_MAPS : 
+              JobCounter.FALLOW_SLOTS_MILLIS_REDUCES;
+        long fallowSlotMillis = (now - info.getTimestamp()) * info.getNumSlots();
+        jobCounters.incrCounter(counter, fallowSlotMillis);
+        
+        // Update 
+        reservedSlots = numSlots - info.getNumSlots();
+        info.setTimestamp(now);
+        info.setNumSlots(numSlots);
+      }
+    }
+    map.put(taskTracker, info);
+    if (type == TaskType.MAP) {
+      jobtracker.getInstrumentation().addReservedMapSlots(reservedSlots);
+    }
+    else {
+      jobtracker.getInstrumentation().addReservedReduceSlots(reservedSlots);
+    }
+    jobtracker.incrementReservations(type, reservedSlots);
+  }
+  
+  public synchronized void unreserveTaskTracker(TaskTracker taskTracker,
+                                                TaskType type) {
+    Map<TaskTracker, FallowSlotInfo> map =
+      (type == TaskType.MAP) ? trackersReservedForMaps : 
+                               trackersReservedForReduces;
+
+    FallowSlotInfo info = map.get(taskTracker);
+    if (info == null) {
+      LOG.warn("Cannot find information about fallow slots for " + 
+               taskTracker.getTrackerName());
+      return;
+    }
+    
+    long now = System.currentTimeMillis();
+
+    Enum<JobCounter> counter = 
+      (type == TaskType.MAP) ? 
+          JobCounter.FALLOW_SLOTS_MILLIS_MAPS : 
+          JobCounter.FALLOW_SLOTS_MILLIS_REDUCES;
+    long fallowSlotMillis = (now - info.getTimestamp()) * info.getNumSlots();
+    jobCounters.incrCounter(counter, fallowSlotMillis);
+
+    map.remove(taskTracker);
+    if (type == TaskType.MAP) {
+      jobtracker.getInstrumentation().decReservedMapSlots(info.getNumSlots());
+    }
+    else {
+      jobtracker.getInstrumentation().decReservedReduceSlots(
+        info.getNumSlots());
+    }
+    jobtracker.decrementReservations(type, info.getNumSlots());
+  }
+  
+  public int getNumReservedTaskTrackersForMaps() {
+    return trackersReservedForMaps.size();
+  }
+  
+  public int getNumReservedTaskTrackersForReduces() {
+    return trackersReservedForReduces.size();
+  }
+  
+  private int getTrackerTaskFailures(String trackerName) {
+    String trackerHostName = convertTrackerNameToHostName(trackerName);
+    Integer failedTasks = trackerToFailuresMap.get(trackerHostName);
+    return (failedTasks != null) ? failedTasks.intValue() : 0; 
+  }
+    
+  /**
+   * Get the black listed trackers for the job
+   * 
+   * @return List of blacklisted tracker names
+   */
+  List<String> getBlackListedTrackers() {
+    List<String> blackListedTrackers = new ArrayList<String>();
+    for (Map.Entry<String,Integer> e : trackerToFailuresMap.entrySet()) {
+       if (e.getValue().intValue() >= maxTaskFailuresPerTracker) {
+         blackListedTrackers.add(e.getKey());
+       }
+    }
+    return blackListedTrackers;
+  }
+  
+  /**
+   * Get the no. of 'flaky' tasktrackers for a given job.
+   * 
+   * @return the no. of 'flaky' tasktrackers for a given job.
+   */
+  int getNoOfBlackListedTrackers() {
+    return flakyTaskTrackers;
+  }
+    
+  /**
+   * Get the information on tasktrackers and no. of errors which occurred
+   * on them for a given job. 
+   * 
+   * @return the map of tasktrackers and no. of errors which occurred
+   *         on them for a given job. 
+   */
+  synchronized Map<String, Integer> getTaskTrackerErrors() {
+    // Clone the 'trackerToFailuresMap' and return the copy
+    Map<String, Integer> trackerErrors = 
+      new TreeMap<String, Integer>(trackerToFailuresMap);
+    return trackerErrors;
+  }
+
+  /**
+   * Remove a map TIP from the lists for running maps.
+   * Called when a map fails/completes (note if a map is killed,
+   * it won't be present in the list since it was completed earlier)
+   * @param tip the tip that needs to be retired
+   */
+  private synchronized void retireMap(TaskInProgress tip) {
+    if (runningMapCache == null) {
+      LOG.warn("Running cache for maps missing!! "
+               + "Job details are missing.");
+      return;
+    }
+    
+    String[] splitLocations = tip.getSplitLocations();
+
+    // Remove the TIP from the list for running non-local maps
+    if (splitLocations == null || splitLocations.length == 0) {
+      nonLocalRunningMaps.remove(tip);
+      return;
+    }
+
+    // Remove from the running map caches
+    for(String host: splitLocations) {
+      Node node = jobtracker.getNode(host);
+
+      for (int j = 0; j < maxLevel; ++j) {
+        Set<TaskInProgress> hostMaps = runningMapCache.get(node);
+        if (hostMaps != null) {
+          hostMaps.remove(tip);
+          if (hostMaps.size() == 0) {
+            runningMapCache.remove(node);
+          }
+        }
+        node = node.getParent();
+      }
+    }
+  }
+
+  /**
+   * Remove a reduce TIP from the list for running-reduces
+   * Called when a reduce fails/completes
+   * @param tip the tip that needs to be retired
+   */
+  private synchronized void retireReduce(TaskInProgress tip) {
+    if (runningReduces == null) {
+      LOG.warn("Running list for reducers missing!! "
+               + "Job details are missing.");
+      return;
+    }
+    runningReduces.remove(tip);
+  }
+
+  /**
+   * Adds a map tip to the list of running maps.
+   * @param tip the tip that needs to be scheduled as running
+   */
+  protected synchronized void scheduleMap(TaskInProgress tip) {
+    
+    runningMapTaskStats.add(0.0f);
+    if (runningMapCache == null) {
+      LOG.warn("Running cache for maps is missing!! " 
+               + "Job details are missing.");
+      return;
+    }
+    String[] splitLocations = tip.getSplitLocations();
+
+    // Add the TIP to the list of non-local running TIPs
+    if (splitLocations == null || splitLocations.length == 0) {
+      nonLocalRunningMaps.add(tip);
+      return;
+    }
+
+    for(String host: splitLocations) {
+      Node node = jobtracker.getNode(host);
+
+      for (int j = 0; j < maxLevel; ++j) {
+        Set<TaskInProgress> hostMaps = runningMapCache.get(node);
+        if (hostMaps == null) {
+          // create a cache if needed
+          hostMaps = new LinkedHashSet<TaskInProgress>();
+          runningMapCache.put(node, hostMaps);
+        }
+        hostMaps.add(tip);
+        node = node.getParent();
+      }
+    }
+  }
+  
+  /**
+   * Adds a reduce tip to the list of running reduces
+   * @param tip the tip that needs to be scheduled as running
+   */
+  protected synchronized void scheduleReduce(TaskInProgress tip) {
+    runningReduceTaskStats.add(0.0f);
+    if (runningReduces == null) {
+      LOG.warn("Running cache for reducers missing!! "
+               + "Job details are missing.");
+      return;
+    }
+    runningReduces.add(tip);
+  }
+  
+  /**
+   * Adds the failed TIP in the front of the list for non-running maps
+   * @param tip the tip that needs to be failed
+   */
+  private synchronized void failMap(TaskInProgress tip) {
+    if (nonRunningMapCache == null) {
+      LOG.warn("Non-running cache for maps missing!! "
+               + "Job details are missing.");
+      return;
+    }
+
+    // 1. Its added everywhere since other nodes (having this split local)
+    //    might have removed this tip from their local cache
+    // 2. Give high priority to failed tip - fail early
+
+    String[] splitLocations = tip.getSplitLocations();
+
+    // Add the TIP in the front of the list for non-local non-running maps
+    if (splitLocations.length == 0) {
+      nonLocalMaps.add(0, tip);
+      return;
+    }
+
+    for(String host: splitLocations) {
+      Node node = jobtracker.getNode(host);
+      
+      for (int j = 0; j < maxLevel; ++j) {
+        List<TaskInProgress> hostMaps = nonRunningMapCache.get(node);
+        if (hostMaps == null) {
+          hostMaps = new LinkedList<TaskInProgress>();
+          nonRunningMapCache.put(node, hostMaps);
+        }
+        hostMaps.add(0, tip);
+        node = node.getParent();
+      }
+    }
+  }
+  
+  /**
+   * Adds a failed TIP in the front of the list for non-running reduces
+   * @param tip the tip that needs to be failed
+   */
+  private synchronized void failReduce(TaskInProgress tip) {
+    if (nonRunningReduces == null) {
+      LOG.warn("Failed cache for reducers missing!! "
+               + "Job details are missing.");
+      return;
+    }
+    nonRunningReduces.add(0, tip);
+  }
+  
+  /**
+   * Find a non-running task in the passed list of TIPs
+   * @param tips a collection of TIPs
+   * @param ttStatus the status of tracker that has requested a task to run
+   * @param numUniqueHosts number of unique hosts that run trask trackers
+   * @param removeFailedTip whether to remove the failed tips
+   */
+  private synchronized TaskInProgress findTaskFromList(
+      Collection<TaskInProgress> tips, TaskTrackerStatus ttStatus,
+      int numUniqueHosts,
+      boolean removeFailedTip) {
+    Iterator<TaskInProgress> iter = tips.iterator();
+    while (iter.hasNext()) {
+      TaskInProgress tip = iter.next();
+
+      // Select a tip if
+      //   1. runnable   : still needs to be run and is not completed
+      //   2. ~running   : no other node is running it
+      //   3. earlier attempt failed : has not failed on this host
+      //                               and has failed on all the other hosts
+      // A TIP is removed from the list if 
+      // (1) this tip is scheduled
+      // (2) if the passed list is a level 0 (host) cache
+      // (3) when the TIP is non-schedulable (running, killed, complete)
+      if (tip.isRunnable() && !tip.isRunning()) {
+        // check if the tip has failed on this host
+        if (!tip.hasFailedOnMachine(ttStatus.getHost()) || 
+             tip.getNumberOfFailedMachines() >= numUniqueHosts) {
+          // check if the tip has failed on all the nodes
+          iter.remove();
+          return tip;
+        } else if (removeFailedTip) { 
+          // the case where we want to remove a failed tip from the host cache
+          // point#3 in the TIP removal logic above
+          iter.remove();
+        }
+      } else {
+        // see point#3 in the comment above for TIP removal logic
+        iter.remove();
+      }
+    }
+    return null;
+  }
+  
+  public boolean hasSpeculativeMaps() {
+    return hasSpeculativeMaps;
+  }
+
+  public boolean hasSpeculativeReduces() {
+    return hasSpeculativeReduces;
+  }
+
+  /**
+   * Retrieve a task for speculation.
+   * If a task slot becomes available and there are less than SpeculativeCap
+   * speculative tasks running: 
+   *  1)Ignore the request if the TT's progressRate is < SlowNodeThreshold
+   *  2)Choose candidate tasks - those tasks whose progress rate is below
+   *    slowTaskThreshold * mean(progress-rates)
+   *  3)Speculate task that's expected to complete last
+   * @param list pool of tasks to choose from
+   * @param taskTrackerName the name of the TaskTracker asking for a task
+   * @param taskTrackerHost the hostname of the TaskTracker asking for a task
+   * @param taskType the type of task (MAP/REDUCE) that we are considering
+   * @return the TIP to speculatively re-execute
+   */
+  protected synchronized TaskInProgress findSpeculativeTask(
+      Collection<TaskInProgress> list, String taskTrackerName, 
+      String taskTrackerHost, TaskType taskType) {
+    if (list.isEmpty()) {
+      return null;
+    }
+    long now = JobTracker.getClock().getTime();
+
+    // Don't return anything if either the TaskTracker is slow or we have
+    // already launched enough speculative tasks in the cluster.
+    if (isSlowTracker(taskTrackerName) || atSpeculativeCap(list, taskType)) {
+      return null;
+    }
+
+    TaskInProgress slowestTIP = null;
+    Comparator<TaskInProgress> LateComparator =
+      new EstimatedTimeLeftComparator(now);
+
+    Iterator<TaskInProgress> iter = list.iterator();
+    while (iter.hasNext()) {
+      TaskInProgress tip = iter.next();
+
+      // If this tip has already run on this machine once or it doesn't need any
+      // more speculative attempts, skip it.
+      if (tip.hasRunOnMachine(taskTrackerHost, taskTrackerName) ||
+          !tip.canBeSpeculated(now)) {
+          continue;
+      }
+
+      if (slowestTIP == null) {
+        slowestTIP = tip;
+      } else {
+        slowestTIP =
+            LateComparator.compare(tip, slowestTIP) < 0 ? tip : slowestTIP;
+      }
+    }
+
+    if (slowestTIP != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Chose task " + slowestTIP.getTIPId() + ". Statistics: Task's : " +
+            slowestTIP.getCurrentProgressRate(now) + " Job's : " + 
+            (slowestTIP.isMapTask() ? runningMapTaskStats : runningReduceTaskStats));
+      }
+    }
+
+  return slowestTIP;
+  }
+
+  /**
+   * Find new map task
+   * @param tts The task tracker that is asking for a task
+   * @param clusterSize The number of task trackers in the cluster
+   * @param numUniqueHosts The number of hosts that run task trackers
+   * @param maxCacheLevel The maximum topology level until which to schedule
+   *                      maps. 
+   *                      A value of {@link #anyCacheLevel} implies any 
+   *                      available task (node-local, rack-local, off-switch and 
+   *                      speculative tasks).
+   *                      A value of {@link #NON_LOCAL_CACHE_LEVEL} implies only
+   *                      off-switch/speculative tasks should be scheduled.
+   * @return the index in tasks of the selected task (or -1 for no task)
+   */
+  private synchronized int findNewMapTask(final TaskTrackerStatus tts, 
+                                          final int clusterSize,
+                                          final int numUniqueHosts,
+                                          final int maxCacheLevel) {
+    String taskTrackerName = tts.getTrackerName();
+    String taskTrackerHost = tts.getHost();
+    if (numMapTasks == 0) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("No maps to schedule for " + profile.getJobID());
+      }
+      return -1;
+    }
+
+    TaskInProgress tip = null;
+    
+    //
+    // Update the last-known clusterSize
+    //
+    this.clusterSize = clusterSize;
+
+    if (!shouldRunOnTaskTracker(taskTrackerName)) {
+      return -1;
+    }
+
+    // Check to ensure this TaskTracker has enough resources to 
+    // run tasks from this job
+    long outSize = resourceEstimator.getEstimatedMapOutputSize();
+    long availSpace = tts.getResourceStatus().getAvailableSpace();
+    if(availSpace < outSize) {
+      LOG.warn("No room for map task. Node " + tts.getHost() + 
+               " has " + availSpace + 
+               " bytes free; but we expect map to take " + outSize);
+
+      return -1; //see if a different TIP might work better. 
+    }
+    
+    
+    // For scheduling a map task, we have two caches and a list (optional)
+    //  I)   one for non-running task
+    //  II)  one for running task (this is for handling speculation)
+    //  III) a list of TIPs that have empty locations (e.g., dummy splits),
+    //       the list is empty if all TIPs have associated locations
+
+    // First a look up is done on the non-running cache and on a miss, a look 
+    // up is done on the running cache. The order for lookup within the cache:
+    //   1. from local node to root [bottom up]
+    //   2. breadth wise for all the parent nodes at max level
+
+    // We fall to linear scan of the list (III above) if we have misses in the 
+    // above caches
+
+    Node node = jobtracker.getNode(tts.getHost());
+    
+    //
+    // I) Non-running TIP :
+    // 
+
+    // 1. check from local node to the root [bottom up cache lookup]
+    //    i.e if the cache is available and the host has been resolved
+    //    (node!=null)
+    if (node != null) {
+      Node key = node;
+      int level = 0;
+      // maxCacheLevel might be greater than this.maxLevel if findNewMapTask is
+      // called to schedule any task (local, rack-local, off-switch or speculative)
+      // tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if findNewMapTask is
+      //  (i.e. -1) if findNewMapTask is to only schedule off-switch/speculative
+      // tasks
+      int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
+      for (level = 0;level < maxLevelToSchedule; ++level) {
+        List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
+        if (cacheForLevel != null) {
+          tip = findTaskFromList(cacheForLevel, tts, 
+              numUniqueHosts,level == 0);
+          if (tip != null) {
+            // Add to running cache
+            scheduleMap(tip);
+
+            // remove the cache if its empty
+            if (cacheForLevel.size() == 0) {
+              nonRunningMapCache.remove(key);
+            }
+
+            return tip.getIdWithinJob();
+          }
+        }
+        key = key.getParent();
+      }
+      
+      // Check if we need to only schedule a local task (node-local/rack-local)
+      if (level == maxCacheLevel) {
+        return -1;
+      }
+    }
+
+    //2. Search breadth-wise across parents at max level for non-running 
+    //   TIP if
+    //     - cache exists and there is a cache miss 
+    //     - node information for the tracker is missing (tracker's topology
+    //       info not obtained yet)
+
+    // collection of node at max level in the cache structure
+    Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
+
+    // get the node parent at max level
+    Node nodeParentAtMaxLevel = 
+      (node == null) ? null : JobTracker.getParentNode(node, maxLevel - 1);
+    
+    for (Node parent : nodesAtMaxLevel) {
+
+      // skip the parent that has already been scanned
+      if (parent == nodeParentAtMaxLevel) {
+        continue;
+      }
+
+      List<TaskInProgress> cache = nonRunningMapCache.get(parent);
+      if (cache != null) {
+        tip = findTaskFromList(cache, tts, numUniqueHosts, false);
+        if (tip != null) {
+          // Add to the running cache
+          scheduleMap(tip);
+
+          // remove the cache if empty
+          if (cache.size() == 0) {
+            nonRunningMapCache.remove(parent);
+          }
+          LOG.info("Choosing a non-local task " + tip.getTIPId());
+          return tip.getIdWithinJob();
+        }
+      }
+    }
+
+    // 3. Search non-local tips for a new task
+    tip = findTaskFromList(nonLocalMaps, tts, numUniqueHosts, false);
+    if (tip != null) {
+      // Add to the running list
+      scheduleMap(tip);
+
+      LOG.info("Choosing a non-local task " + tip.getTIPId());
+      return tip.getIdWithinJob();
+    }
+
+    //
+    // II) Running TIP :
+    // 
+ 
+    if (hasSpeculativeMaps) {
+      tip = getSpeculativeMap(taskTrackerName, taskTrackerHost);
+      if (tip != null) {
+        return tip.getIdWithinJob();
+      }
+    }
+   return -1;
+  }
+
+  private synchronized TaskInProgress getSpeculativeMap(String taskTrackerName, 
+      String taskTrackerHost) {
+
+    //////// Populate allTips with all TaskInProgress
+    Set<TaskInProgress> allTips = new HashSet<TaskInProgress>();
+    
+    // collection of node at max level in the cache structure
+    Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
+    // Add all tasks from max-level nodes breadth-wise
+    for (Node parent : nodesAtMaxLevel) {
+      Set<TaskInProgress> cache = runningMapCache.get(parent);
+      if (cache != null) {
+        allTips.addAll(cache);
+      }
+    }
+    // Add all non-local TIPs
+    allTips.addAll(nonLocalRunningMaps);
+    
+    ///////// Select a TIP to run on
+    TaskInProgress tip = findSpeculativeTask(allTips, taskTrackerName, 
+        taskTrackerHost, TaskType.MAP);
+    
+    if (tip != null) {
+      LOG.info("Choosing map task " + tip.getTIPId() + 
+          " for speculative execution");
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No speculative map task found for tracker " + taskTrackerName);
+      }
+    }
+    return tip;
+  }
+  
+  /**
+   * Find new reduce task
+   * @param tts The task tracker that is asking for a task
+   * @param clusterSize The number of task trackers in the cluster
+   * @param numUniqueHosts The number of hosts that run task trackers
+   * @return the index in tasks of the selected task (or -1 for no task)
+   */
+  private synchronized int findNewReduceTask(TaskTrackerStatus tts, 
+                                             int clusterSize,
+                                             int numUniqueHosts) {
+    String taskTrackerName = tts.getTrackerName();
+    String taskTrackerHost = tts.getHost();
+    if (numReduceTasks == 0) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("No reduces to schedule for " + profile.getJobID());
+      }
+      return -1;
+    }
+    TaskInProgress tip = null;
+    
+    // Update the last-known clusterSize
+    this.clusterSize = clusterSize;
+
+    if (!shouldRunOnTaskTracker(taskTrackerName)) {
+      return -1;
+    }
+
+    long outSize = resourceEstimator.getEstimatedReduceInputSize();
+    long availSpace = tts.getResourceStatus().getAvailableSpace();
+    if(availSpace < outSize) {
+      LOG.warn("No room for reduce task. Node " + taskTrackerName + " has " +
+                availSpace + 
+               " bytes free; but we expect reduce input to take " + outSize);
+
+      return -1; //see if a different TIP might work better. 
+    }
+    
+    // 1. check for a never-executed reduce tip
+    // reducers don't have a cache and so pass -1 to explicitly call that out
+    tip = findTaskFromList(nonRunningReduces, tts, numUniqueHosts, false);
+    if (tip != null) {
+      scheduleReduce(tip);
+      return tip.getIdWithinJob();
+    }
+
+    // 2. check for a reduce tip to be speculated
+    if (hasSpeculativeReduces) {
+      tip = getSpeculativeReduce(taskTrackerName, taskTrackerHost);
+      if (tip != null) {
+        return tip.getIdWithinJob();
+      }
+    }
+
+    return -1;
+  }
+
+  private synchronized TaskInProgress getSpeculativeReduce(
+      String taskTrackerName, String taskTrackerHost) {
+    TaskInProgress tip = findSpeculativeTask(
+        runningReduces, taskTrackerName, taskTrackerHost, TaskType.REDUCE);
+    if (tip != null) {
+      LOG.info("Choosing reduce task " + tip.getTIPId() + 
+          " for speculative execution");
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No speculative map task found for tracker "
+                  + taskTrackerHost);
+      }
+    }
+    return tip;
+  }
+
+    /**
+     * Check to see if the maximum number of speculative tasks are
+     * already being executed currently.
+     * @param tasks the set of tasks to test
+     * @param type the type of task (MAP/REDUCE) that we are considering
+     * @return has the cap been reached?
+     */
+   private boolean atSpeculativeCap(Collection<TaskInProgress> tasks, 
+       TaskType type) {
+     float numTasks = tasks.size();
+     if (numTasks == 0){
+       return true; // avoid divide by zero
+     }
+     int speculativeTaskCount = type == TaskType.MAP ? speculativeMapTasks 
+         : speculativeReduceTasks;
+     //return true if totalSpecTask < max(10, 0.01 * total-slots, 
+     //                                   0.1 * total-running-tasks)
+
+     if (speculativeTaskCount < MIN_SPEC_CAP) {
+       return false; // at least one slow tracker's worth of slots(default=10)
+     }
+     ClusterStatus c = jobtracker.getClusterStatus(false); 
+     int numSlots = (type == TaskType.MAP ? c.getMaxMapTasks() : c.getMaxReduceTasks());
+     if ((float)speculativeTaskCount < numSlots * MIN_SLOTS_CAP) {
+       return false;
+     }
+     boolean atCap = (((float)(speculativeTaskCount)/numTasks) >= speculativeCap);
+     if (LOG.isDebugEnabled()) {
+       LOG.debug("SpeculativeCap is "+speculativeCap+", specTasks/numTasks is " +
+           ((float)(speculativeTaskCount)/numTasks)+
+           ", so atSpecCap() is returning "+atCap);
+     }
+     return atCap;
+   }
+  
+  /**
+   * A class for comparing the estimated time to completion of two tasks
+   */
+  private static class EstimatedTimeLeftComparator 
+  implements Comparator<TaskInProgress> {
+    private long time;
+    public EstimatedTimeLeftComparator(long now) {
+      this.time = now;
+    }
+    /**
+     * Estimated time to completion is measured as:
+     *   % of task left to complete (1 - progress) / progress rate of the task.
+     * 
+     * This assumes that tasks are linear in their progress, which is 
+     * often wrong, especially since progress for reducers is currently

[... 1246 lines stripped ...]


Mime
View raw message