hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r641610 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/mapred/
Date Wed, 26 Mar 2008 22:26:40 GMT
Author: omalley
Date: Wed Mar 26 15:26:36 2008
New Revision: 641610

URL: http://svn.apache.org/viewvc?rev=641610&view=rev
Log:
HADOOP-2119.  Optimize scheduling of jobs with large numbers of
tasks by replacing static arrays with lists of runnable tasks. 
Contributed by Amar Kamat.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=641610&r1=641609&r2=641610&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Mar 26 15:26:36 2008
@@ -181,6 +181,10 @@
     HADOOP-2902.  Replace uses of "fs.default.name" with calls to the
     accessor methods added in HADOOP-1967.  (cutting)
 
+    HADOOP-2119.  Optimize scheduling of jobs with large numbers of
+    tasks by replacing static arrays with lists of runnable tasks. 
+    (Amar Kamat via omalley)
+
   BUG FIXES
 
     HADOOP-2195. '-mkdir' behaviour is now closer to Linux shell in case of

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=641610&r1=641609&r2=641610&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Mar 26 15:26:36 2008
@@ -1899,15 +1899,12 @@
     public void run() {
       try {
         while (fsRunning) {
-          int size;
-          if((size = resolutionQueue.size()) == 0) {
-            Thread.sleep(2000);
-            continue;
-          }
           List <DatanodeDescriptor> datanodes = 
-            new ArrayList<DatanodeDescriptor>(size);
+            new ArrayList<DatanodeDescriptor>(resolutionQueue.size());
+          // Block if the queue is empty
+          datanodes.add(resolutionQueue.take());
           resolutionQueue.drainTo(datanodes);
-          List<String> dnHosts = new ArrayList<String>(size);
+          List<String> dnHosts = new ArrayList<String>(datanodes.size());
           for (DatanodeDescriptor d : datanodes) {
             dnHosts.add(d.getName());
           }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=641610&r1=641609&r2=641610&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Mar 26 15:26:36 2008
@@ -20,10 +20,14 @@
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.Vector;
 
@@ -77,8 +81,23 @@
   JobTracker jobtracker = null;
 
   // NetworkTopology Node to the set of TIPs
-  Map<Node, List<TaskInProgress>> nodesToMaps;
+  Map<Node, List<TaskInProgress>> nonRunningMapCache;
   
+  // Map of NetworkTopology Node to set of running TIPs
+  Map<Node, Set<TaskInProgress>> runningMapCache;
+
+  // A list of non-local non-running maps
+  List<TaskInProgress> nonLocalMaps;
+
+  // A set of non-local running maps
+  Set<TaskInProgress> nonLocalRunningMaps;
+
+  // A list of non-running reduce TIPs
+  List<TaskInProgress> nonRunningReduces;
+
+  // A set of running reduce TIPs
+  Set<TaskInProgress> runningReduces;
+
   private int maxLevel;
   
   private int taskCompletionEventTracker = 0; 
@@ -183,6 +202,12 @@
     this.jobMetrics.setTag("jobId", jobid);
     hasSpeculativeMaps = conf.getMapSpeculativeExecution();
     hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+    this.maxLevel = jobtracker.getNumTaskCacheLevels();
+    this.nonLocalMaps = new LinkedList<TaskInProgress>();
+    this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+    this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+    this.nonRunningReduces = new LinkedList<TaskInProgress>();    
+    this.runningReduces = new LinkedHashSet<TaskInProgress>();
   }
 
   /**
@@ -215,12 +240,6 @@
     jobMetrics.remove();
   }
     
-  private Node getParentNode(Node node, int level) {
-    for (int i = 0; node != null && i < level; i++) {
-      node = node.getParent();
-    }
-    return node;
-  }
   private void printCache (Map<Node, List<TaskInProgress>> cache) {
     LOG.info("The taskcache info:");
     for (Map.Entry<Node, List<TaskInProgress>> n : cache.entrySet()) {
@@ -238,18 +257,16 @@
       new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
     
     for (int i = 0; i < splits.length; i++) {
-      for(String host: splits[i].getLocations()) {
+      String[] splitLocations = splits[i].getLocations();
+      if (splitLocations.length == 0) {
+        nonLocalMaps.add(maps[i]);
+        continue;
+      }
+
+      for(String host: splitLocations) {
         Node node = jobtracker.resolveAndAddToTopology(host);
-        if (node == null) {
-          continue;
-        }
-        if (node.getLevel() < maxLevel) {
-          LOG.warn("Got a host whose level is: " + node.getLevel() +
-              ". Should get at least a level of value: " + maxLevel);
-          return null;
-        }
         for (int j = 0; j < maxLevel; j++) {
-          node = getParentNode(node, j);
+          node = JobTracker.getParentNode(node, j);
           List<TaskInProgress> hostMaps = cache.get(node);
           if (hostMaps == null) {
             hostMaps = new ArrayList<TaskInProgress>();
@@ -300,7 +317,7 @@
                                    jobtracker, conf, this, i);
     }
     if (numMapTasks > 0) { 
-      nodesToMaps = createCache(splits, (maxLevel = jobtracker.getNumTaskCacheLevels()));
+      nonRunningMapCache = createCache(splits, maxLevel);
     }
         
     // if no split is returned, job is considered completed and successful
@@ -331,6 +348,7 @@
       reduces[i] = new TaskInProgress(jobId, jobFile, 
                                       numMapTasks, i, 
                                       jobtracker, conf, this);
+      nonRunningReduces.add(reduces[i]);
     }
 
     // create job specific temporary directory in output path
@@ -633,8 +651,7 @@
     }
     
     
-    int target = findNewTask(tts, clusterSize, status.mapProgress(), 
-                             maps, nodesToMaps, hasSpeculativeMaps);
+    int target = findNewMapTask(tts, clusterSize, status.mapProgress());
     if (target == -1) {
       return null;
     }
@@ -669,8 +686,7 @@
       return null;
     }
 
-    int target = findNewTask(tts, clusterSize, status.reduceProgress() , 
-                             reduces, null, hasSpeculativeReduces);
+    int  target = findNewReduceTask(tts, clusterSize, status.reduceProgress());
     if (target == -1) {
       return null;
     }
@@ -755,156 +771,507 @@
       new TreeMap<String, Integer>(trackerToFailuresMap);
     return trackerErrors;
   }
+
+  /**
+   * Remove a map TIP from the lists for running maps.
+   * Called when a map fails/completes (note if a map is killed,
+   * it won't be present in the list since it was completed earlier)
+   * @param tip the tip that needs to be retired
+   */
+  private synchronized void retireMap(TaskInProgress tip) {
+    // Since a list for running maps is maintained if speculation is 'ON'
+    if (hasSpeculativeMaps) {
+      if (runningMapCache == null) {
+        LOG.warn("Running cache for maps missing!! "
+                 + "Job details are missing.");
+        return;
+      }
+      String[] splitLocations = tip.getSplitLocations();
+
+      // Remove the TIP from the list for running non-local maps
+      if (splitLocations.length == 0) {
+        nonLocalRunningMaps.remove(tip);
+        return;
+      }
+
+      // Remove from the running map caches
+      for(String host: splitLocations) {
+        Node node = jobtracker.getNode(host);
+        
+        for (int j = 0; j < maxLevel; ++j) {
+          Set<TaskInProgress> hostMaps = runningMapCache.get(node);
+          if (hostMaps != null) {
+            hostMaps.remove(tip);
+            if (hostMaps.size() == 0) {
+              runningMapCache.remove(node);
+            }
+          }
+          node = node.getParent();
+        }
+      }
+    }
+  }
+
+  /**
+   * Remove a reduce TIP from the list for running-reduces
+   * Called when a reduce fails/completes
+   * @param tip the tip that needs to be retired
+   */
+  private synchronized void retireReduce(TaskInProgress tip) {
+    // Since a list for running reduces is maintained if speculation is 'ON'
+    if (hasSpeculativeReduces) {
+      if (runningReduces == null) {
+        LOG.warn("Running list for reducers missing!! "
+                 + "Job details are missing.");
+        return;
+      }
+      runningReduces.remove(tip);
+    }
+  }
+
+  /**
+   * Adds a map tip to the list of running maps.
+   * @param tip the tip that needs to be scheduled as running
+   */
+  private synchronized void scheduleMap(TaskInProgress tip) {
     
-  private boolean shouldRunSpeculativeTask(long currentTime,
-                                           TaskInProgress task, 
-                                           double avgProgress,
-                                           String taskTracker) {
-    return task.hasSpeculativeTask(currentTime, avgProgress) && 
-           !task.hasRunOnMachine(taskTracker);
+    // Since a running list is maintained only if speculation is 'ON'
+    if (hasSpeculativeMaps) {
+      if (runningMapCache == null) {
+        LOG.warn("Running cache for maps is missing!! " 
+                 + "Job details are missing.");
+        return;
+      }
+      String[] splitLocations = tip.getSplitLocations();
+
+      // Add the TIP to the list of non-local running TIPs
+      if (splitLocations.length == 0) {
+        nonLocalRunningMaps.add(tip);
+        return;
+      }
+
+      for(String host: splitLocations) {
+        Node node = jobtracker.getNode(host);
+      
+        for (int j = 0; j < maxLevel; ++j) {
+          Set<TaskInProgress> hostMaps = runningMapCache.get(node);
+          if (hostMaps == null) {
+            // create a cache if needed
+            hostMaps = new LinkedHashSet<TaskInProgress>();
+            runningMapCache.put(node, hostMaps);
+          }
+          hostMaps.add(tip);
+          node = node.getParent();
+        }
+      }
+    }
   }
   
   /**
-   * Find a new task to run.
+   * Adds a reduce tip to the list of running reduces
+   * @param tip the tip that needs to be scheduled as running
+   */
+  private synchronized void scheduleReduce(TaskInProgress tip) {
+    // Since a list for running reduces is maintained if speculation is 'ON'
+    if (hasSpeculativeReduces) {
+      if (runningReduces == null) {
+        LOG.warn("Running cache for reducers missing!! "
+                 + "Job details are missing.");
+        return;
+      }
+      runningReduces.add(tip);
+    }
+  }
+  
+  /**
+   * Adds the failed TIP in the front of the list for non-running maps
+   * @param tip the tip that needs to be failed
+   */
+  private synchronized void failMap(TaskInProgress tip) {
+    if (nonRunningMapCache == null) {
+      LOG.warn("Non-running cache for maps missing!! "
+               + "Job details are missing.");
+      return;
+    }
+
+    // 1. Its added everywhere since other nodes (having this split local)
+    //    might have removed this tip from their local cache
+    // 2. Give high priority to failed tip - fail early
+
+    String[] splitLocations = tip.getSplitLocations();
+
+    // Add the TIP in the front of the list for non-local non-running maps
+    if (splitLocations.length == 0) {
+      nonLocalMaps.add(0, tip);
+      return;
+    }
+
+    for(String host: splitLocations) {
+      Node node = jobtracker.getNode(host);
+      
+      for (int j = 0; j < maxLevel; ++j) {
+        List<TaskInProgress> hostMaps = nonRunningMapCache.get(node);
+        if (hostMaps == null) {
+          hostMaps = new LinkedList<TaskInProgress>();
+          nonRunningMapCache.put(node, hostMaps);
+        }
+        hostMaps.add(0, tip);
+        node = node.getParent();
+      }
+    }
+  }
+  
+  /**
+   * Adds a failed TIP in the front of the list for non-running reduces
+   * @param tip the tip that needs to be failed
+   */
+  private synchronized void failReduce(TaskInProgress tip) {
+    if (nonRunningReduces == null) {
+      LOG.warn("Failed cache for reducers missing!! "
+               + "Job details are missing.");
+      return;
+    }
+    nonRunningReduces.add(0, tip);
+  }
+  
+  /**
+   * Find a non-running task in the passed list of TIPs
+   * @param tips a collection of TIPs
+   * @param taskTracker the tracker that has requested a task to run
+   * @param removeFailedTip whether to remove the failed tips
+   */
+  private synchronized TaskInProgress findTaskFromList(
+      Collection<TaskInProgress> tips, String taskTracker, boolean removeFailedTip) {
+    Iterator<TaskInProgress> iter = tips.iterator();
+    while (iter.hasNext()) {
+      TaskInProgress tip = iter.next();
+
+      // Select a tip if
+      //   1. runnable   : still needs to be run and is not completed
+      //   2. ~running   : no other node is running it
+      //   3. earlier attempt failed : has not failed on this host
+      //                               and has failed on all the other hosts
+      // A TIP is removed from the list if 
+      // (1) this tip is scheduled
+      // (2) if the passed list is a level 0 (host) cache
+      // (3) when the TIP is non-schedulable (running, killed, complete)
+      if (tip.isRunnable() && !tip.isRunning()) {
+        // check if the tip has failed on this host
+        if (!tip.hasFailedOnMachine(taskTracker) || 
+             tip.getNumberOfFailedMachines() >= clusterSize) {
+          // check if the tip has failed on all the nodes
+          iter.remove();
+          return tip;
+        } else if (removeFailedTip) { 
+          // the case where we want to remove a failed tip from the host cache
+          // point#3 in the TIP removal logic above
+          iter.remove();
+        }
+      } else {
+        // see point#3 in the comment above for TIP removal logic
+        iter.remove();
+      }
+    }
+    return null;
+  }
+  
+  /**
+   * Find a speculative task
+   * @param list a list of tips
+   * @param taskTracker the tracker that has requested a tip
+   * @param avgProgress the average progress for speculation
+   * @param currentTime current time in milliseconds
+   * @param shouldRemove whether to remove the tips
+   * @return a tip that can be speculated on the tracker
+   */
+  private synchronized TaskInProgress findSpeculativeTask(
+      Collection<TaskInProgress> list, String taskTracker, double avgProgress,
+      long currentTime, boolean shouldRemove) {
+    
+    Iterator<TaskInProgress> iter = list.iterator();
+
+    while (iter.hasNext()) {
+      TaskInProgress tip = iter.next();
+      // should never be true! (since we delete completed/failed tasks)
+      if (!tip.isRunning()) {
+        iter.remove();
+        continue;
+      }
+
+      if (!tip.hasRunOnMachine(taskTracker)) {
+        if (tip.hasSpeculativeTask(currentTime, avgProgress)) {
+          // In case of shared list we don't remove it. Since the TIP failed 
+          // on this tracker can be scheduled on some other tracker.
+          if (shouldRemove) {
+            iter.remove(); //this tracker is never going to run it again
+          }
+          return tip;
+        } 
+      } else {
+        // Check if this tip can be removed from the list.
+        // If the list is shared then we should not remove.
+        if (shouldRemove) {
+          // This tracker will never speculate this tip
+          iter.remove();
+        }
+      }
+    }
+    return null;
+  }
+  
+  /**
+   * Find new map task
    * @param tts The task tracker that is asking for a task
    * @param clusterSize The number of task trackers in the cluster
    * @param avgProgress The average progress of this kind of task in this job
-   * @param tasks The list of potential tasks to try
-   * @param firstTaskToTry The first index in tasks to check
-   * @param cachedTasks A list of tasks that would like to run on this node
-   * @param hasSpeculative Should it try to find speculative tasks
    * @return the index in tasks of the selected task (or -1 for no task)
    */
-  private int findNewTask(TaskTrackerStatus tts, 
-                          int clusterSize,
-                          double avgProgress,
-                          TaskInProgress[] tasks,
-                          Map<Node,List<TaskInProgress>> cachedTasks,
-                          boolean hasSpeculative) {
+  private synchronized int findNewMapTask(TaskTrackerStatus tts, 
+                                          int clusterSize,
+                                          double avgProgress) {
     String taskTracker = tts.getTrackerName();
-    int specTarget = -1;
-
+    TaskInProgress tip = null;
+    
     //
     // Update the last-known clusterSize
     //
     this.clusterSize = clusterSize;
 
-    Node node = jobtracker.getNode(tts.getHost());
-    //
-    // Check if too many tasks of this job have failed on this
-    // tasktracker prior to assigning it a new one.
-    //
-    int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker);
-    if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) && 
-        taskTrackerFailedTasks >= conf.getMaxTaskFailuresPerTracker()) {
-      if (LOG.isDebugEnabled()) {
-        String flakyTracker = convertTrackerNameToHostName(taskTracker); 
-        LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker 
-                + "' for assigning a new task");
-      }
+    if (!shouldRunOnTaskTracker(taskTracker)) {
       return -1;
     }
-    long currentTime = System.currentTimeMillis();
-        
-    //
-    // See if there is a split over a block that is stored on
-    // the TaskTracker checking in or the rack it belongs to and so on till
-    // maxLevel.  That means the block
-    // doesn't have to be transmitted from another node/rack/and so on.
-    // The way the cache is updated is such that in every lookup, the TIPs
-    // which are complete is removed. Running/Failed TIPs are not removed
-    // since we want to have locality optimizations even for FAILED/SPECULATIVE
-    // tasks.
+
+    Node node = jobtracker.getNode(tts.getHost());
+    
+    // For scheduling a map task, we have two caches and a list (optional)
+    //  I)   one for non-running task
+    //  II)  one for running task (this is for handling speculation)
+    //  III) a list of TIPs that have empty locations (e.g., dummy splits),
+    //       the list is empty if all TIPs have associated locations
+
+    // First a look up is done on the non-running cache and on a miss, a look 
+    // up is done on the running cache. The order for lookup within the cache:
+    //   1. from local node to root [bottom up]
+    //   2. breadth wise for all the parent nodes at max level
+
+    // We fall to linear scan of the list (III above) if we have misses in the 
+    // above caches
+
     //
-    if (cachedTasks != null && node != null) {
+    // I) Non-running TIP :
+    // 
+
+    // 1. check from local node to the root [bottom up cache lookup]
+    //    i.e if the cache is available and the host has been resolved
+    //    (node!=null)
+    
+    if (node != null) {
       Node key = node;
-      for (int level = 0; level < maxLevel && key != null; level++) {
-        List <TaskInProgress> cacheForLevel = cachedTasks.get(key);
+      for (int level = 0; level < maxLevel; ++level) {
+        List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
         if (cacheForLevel != null) {
-          Iterator<TaskInProgress> i = cacheForLevel.iterator();
-          while (i.hasNext()) {
-            TaskInProgress tip = i.next();
-            // we remove only those TIPs that are data-local (the host having
-            // the data is running the task). We don't remove TIPs that are 
-            // rack-local for example since that would negatively impact
-            // the performance of speculative and failed tasks (imagine a case
-            // where we schedule one TIP rack-local and after sometime another
-            // tasktracker from the same rack is asking for a task, and the TIP
-            // in question has either failed or could be a speculative task
-            // candidate)
-            if (tip.isComplete() || level == 0) {
-              i.remove();
-            }
-            if (tip.isRunnable() && 
-                !tip.isRunning() &&
-                !tip.hasFailedOnMachine(taskTracker)) {
-              int cacheTarget = tip.getIdWithinJob();
-              if (level == 0) {
-                LOG.info("Choosing data-local task " + tip.getTIPId());
-                jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
-              } else if (level == 1){
-                LOG.info("Choosing rack-local task " + tip.getTIPId());
-                jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
-              } else {
-                LOG.info("Choosing cached task at level " + level + " " + 
-                          tip.getTIPId());
-              }
-              return cacheTarget;
+          tip = findTaskFromList(cacheForLevel, taskTracker, level == 0);
+          if (tip != null) {
+            // Add to running cache
+            scheduleMap(tip);
+
+            // remove the cache if its empty
+            if (cacheForLevel.size() == 0) {
+              nonRunningMapCache.remove(key);
             }
-            if (hasSpeculative && specTarget == -1 &&
-                shouldRunSpeculativeTask(currentTime, tip, avgProgress, 
-                                         taskTracker)) {
-              specTarget = tip.getIdWithinJob();
+
+            if (level == 0) {
+              LOG.info("Choosing data-local task " + tip.getTIPId());
+              jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
+            } else if (level == 1){
+              LOG.info("Choosing rack-local task " + tip.getTIPId());
+              jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
+            } else {
+              LOG.info("Choosing cached task at level " + level 
+                       + tip.getTIPId());
             }
+
+            return tip.getIdWithinJob();
           }
         }
         key = key.getParent();
       }
     }
 
+    //2. Search breadth-wise across parents at max level for non-running 
+    //   TIP if
+    //     - cache exists and there is a cache miss 
+    //     - node information for the tracker is missing (tracker's topology
+    //       info not obtained yet)
+
+    // get the node parent at max level
+    Node nodeParentAtMaxLevel = JobTracker.getParentNode(node, maxLevel - 1);
+    // collection of node at max level in the cache structure
+    Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
+    
+    for (Node parent : nodesAtMaxLevel) {
+
+      // skip the parent that has already been scanned
+      if (parent == nodeParentAtMaxLevel) {
+        continue;
+      }
+
+      List<TaskInProgress> cache = nonRunningMapCache.get(parent);
+      if (cache != null) {
+        tip = findTaskFromList(cache, taskTracker, false);
+        if (tip != null) {
+          // Add to the running cache
+          scheduleMap(tip);
+
+          // remove the cache if empty
+          if (cache.size() == 0) {
+            nonRunningMapCache.remove(parent);
+          }
+          LOG.info("Choosing a non-local task " + tip.getTIPId());
+          return tip.getIdWithinJob();
+        }
+      }
+    }
+
+    // 3. Search non-local tips for a new task
+    tip = findTaskFromList(nonLocalMaps, taskTracker, false);
+    if (tip != null) {
+      // Add to the running list
+      scheduleMap(tip);
+
+      LOG.info("Choosing a non-local task " + tip.getTIPId());
+      return tip.getIdWithinJob();
+    }
 
     //
-    // If there's no cached target, see if there's
-    // a std. task to run.
-    //
-    int failedTarget = -1;
-    for (int i = 0; i < tasks.length; i++) {
-      TaskInProgress task = tasks[i];
-      if (task.isRunnable()) {
-        // if it failed here and we haven't tried every machine, we
-        // don't schedule it here.
-        boolean hasFailed = task.hasFailedOnMachine(taskTracker);
-        if (hasFailed && (task.getNumberOfFailedMachines() < clusterSize)) {
+    // II) Running TIP :
+    // 
+ 
+    if (hasSpeculativeMaps) {
+      long currentTime = System.currentTimeMillis();
+
+      // 1. Check bottom up for speculative tasks from the running cache
+      if (node != null) {
+        Node key = node;
+        for (int level = 0; level < maxLevel; ++level) {
+          Set<TaskInProgress> cacheForLevel = runningMapCache.get(key);
+          if (cacheForLevel != null) {
+            tip = findSpeculativeTask(cacheForLevel, taskTracker, 
+                                      avgProgress, currentTime, level == 0);
+            if (tip != null) {
+              if (cacheForLevel.size() == 0) {
+                runningMapCache.remove(key);
+              }
+              if (level == 0) {
+                LOG.info("Choosing a data-local task " + tip.getTIPId() 
+                         + " for speculation");
+              } else if (level == 1){
+                LOG.info("Choosing a rack-local task " + tip.getTIPId() 
+                         + " for speculation");
+              } else {
+                LOG.info("Choosing a cached task at level " + level
+                         + tip.getTIPId() + " for speculation");
+              }
+              return tip.getIdWithinJob();
+            }
+          }
+          key = key.getParent();
+        }
+      }
+
+      // 2. Check breadth-wise for speculative tasks
+      
+      for (Node parent : nodesAtMaxLevel) {
+        // ignore the parent which is already scanned
+        if (parent == nodeParentAtMaxLevel) {
           continue;
         }
-        boolean isRunning = task.isRunning();
-        if (hasFailed) {
-          // failed tasks that aren't running can be scheduled as a last
-          // resort
-          if (!isRunning && failedTarget == -1) {
-            failedTarget = i;
-          }
-        } else {
-          if (!isRunning) {
-            LOG.info("Choosing normal task " + tasks[i].getTIPId());
-            return i;
-          } else if (hasSpeculative && specTarget == -1 &&
-                     shouldRunSpeculativeTask(currentTime, task, avgProgress,
-                                              taskTracker)) {
-            specTarget = i;
+
+        Set<TaskInProgress> cache = runningMapCache.get(parent);
+        if (cache != null) {
+          tip = findSpeculativeTask(cache, taskTracker, avgProgress, 
+                                    currentTime, false);
+          if (tip != null) {
+            // remove empty cache entries
+            if (cache.size() == 0) {
+              runningMapCache.remove(parent);
+            }
+            LOG.info("Choosing a non-local task " + tip.getTIPId() 
+                     + " for speculation");
+            return tip.getIdWithinJob();
           }
         }
       }
+
+      // 3. Check non-local tips for speculation
+      tip = findSpeculativeTask(nonLocalRunningMaps, taskTracker, avgProgress, 
+                                currentTime, false);
+      if (tip != null) {
+        LOG.info("Choosing a non-local task " + tip.getTIPId() 
+                 + " for speculation");
+        return tip.getIdWithinJob();
+      }
     }
-    if (specTarget != -1) {
-      LOG.info("Choosing speculative task " + 
-               tasks[specTarget].getTIPId());
-    } else if (failedTarget != -1) {
-      LOG.info("Choosing failed task " + 
-               tasks[failedTarget].getTIPId());          
-    }
+    return -1;
+  }
+
+  /**
+   * Find new reduce task
+   * @param tts The task tracker that is asking for a task
+   * @param clusterSize The number of task trackers in the cluster
+   * @param avgProgress The average progress of this kind of task in this job
+   * @return the index in tasks of the selected task (or -1 for no task)
+   */
+  private synchronized int findNewReduceTask(TaskTrackerStatus tts, 
+                                             int clusterSize,
+                                             double avgProgress) {
+    String taskTracker = tts.getTrackerName();
+    TaskInProgress tip = null;
     
-    return specTarget != -1 ? specTarget : failedTarget;
+    // Update the last-known clusterSize
+    this.clusterSize = clusterSize;
+
+    if (!shouldRunOnTaskTracker(taskTracker)) {
+      return -1;
+    }
+
+    // 1. check for a never-executed reduce tip
+    // reducers don't have a cache and so pass -1 to explicitly call that out
+    tip = findTaskFromList(nonRunningReduces, taskTracker, false);
+    if (tip != null) {
+      scheduleReduce(tip);
+      return tip.getIdWithinJob();
+    }
+
+    // 2. check for a reduce tip to be speculated
+    if (hasSpeculativeReduces) {
+      tip = findSpeculativeTask(runningReduces, taskTracker, avgProgress, 
+                                System.currentTimeMillis(), false);
+      if (tip != null) {
+        scheduleReduce(tip);
+        return tip.getIdWithinJob();
+      }
+    }
+
+    return -1;
+  }
+  
+  private boolean shouldRunOnTaskTracker(String taskTracker) {
+    //
+    // Check if too many tasks of this job have failed on this
+    // tasktracker prior to assigning it a new one.
+    //
+    int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker);
+    if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) && 
+        taskTrackerFailedTasks >= conf.getMaxTaskFailuresPerTracker()) {
+      if (LOG.isDebugEnabled()) {
+        String flakyTracker = convertTrackerNameToHostName(taskTracker); 
+        LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker 
+                  + "' for assigning a new task");
+      }
+      return false;
+    }
+    return true;
   }
 
   /**
@@ -969,10 +1336,14 @@
       runningMapTasks -= 1;
       finishedMapTasks += 1;
       metrics.completeMap();
+      // remove the completed map from the resp running caches
+      retireMap(tip);
     } else{
       runningReduceTasks -= 1;
       finishedReduceTasks += 1;
       metrics.completeReduce();
+      // remove the completed reduces from the running reducers set
+      retireReduce(tip);
     }
         
     //
@@ -999,23 +1370,15 @@
    * @return
    */
   private boolean isJobComplete(TaskInProgress tip, JobTrackerMetrics metrics) {
-    boolean allDone = true;
-    for (int i = 0; i < maps.length; i++) {
-      if (!(maps[i].isComplete() || maps[i].isFailed())) {
-        allDone = false;
-        break;
-      }
-    }
+    // Job is complete if total-tips = finished-tips + failed-tips
+    boolean allDone = 
+      ((finishedMapTasks + failedMapTIPs) == numMapTasks);
     if (allDone) {
       if (tip.isMapTask()) {
         this.status.setMapProgress(1.0f);              
       }
-      for (int i = 0; i < reduces.length; i++) {
-        if (!(reduces[i].isComplete() || reduces[i].isFailed())) {
-          allDone = false;
-          break;
-        }
-      }
+      allDone = 
+        ((finishedReduceTasks + failedReduceTIPs) == numReduceTasks);
     }
 
     //
@@ -1079,7 +1442,9 @@
                           TaskStatus status, String trackerName,
                           boolean wasRunning, boolean wasComplete,
                           JobTrackerMetrics metrics) {
-    
+    // check if the TIP is already failed
+    boolean wasFailed = tip.isFailed();
+
     // Mark the taskid as FAILED or KILLED
     tip.incompleteSubTask(taskid, trackerName, this.status);
    
@@ -1090,8 +1455,20 @@
     if (wasRunning && !isRunning) {
       if (tip.isMapTask()){
         runningMapTasks -= 1;
+        // remove from the running queue and put it in the non-running cache
+        // if the tip is not complete i.e if the tip still needs to be run
+        if (!isComplete) {
+          retireMap(tip);
+          failMap(tip);
+        }
       } else {
         runningReduceTasks -= 1;
+        // remove from the running queue and put in the failed queue if the tip
+        // is not complete
+        if (!isComplete) {
+          retireReduce(tip);
+          failReduce(tip);
+        }
       }
     }
         
@@ -1100,25 +1477,12 @@
       if (tip.isMapTask()) {
         // Put the task back in the cache. This will help locality for cases
         // where we have a different TaskTracker from the same rack/switch
-        // asking for a task. Note that we don't add the TIP to the host's cache
-        // again since we don't execute a failed TIP on the same TT again, 
-        // and also we bother about only those TIPs that were successful
+        // asking for a task. 
+        // We bother about only those TIPs that were successful
         // earlier (wasComplete and !isComplete) 
         // (since they might have been removed from the cache of other 
         // racks/switches, if the input split blocks were present there too)
-        for (String host : tip.getSplitLocations()) {
-          Node node = jobtracker.getNode(host);
-          for (int level = 1; (node != null && level < maxLevel); level++) {
-            node = getParentNode(node, level);
-            if (node == null) {
-              break;
-            }
-            List<TaskInProgress> list = nodesToMaps.get(node);
-            if (list != null) {
-              list.add(tip);
-            }
-          }
-        }
+        failMap(tip);
         finishedMapTasks -= 1;
       }
     }
@@ -1176,8 +1540,9 @@
     //
     // Check if we need to kill the job because of too many failures or 
     // if the job is complete since all component tasks have completed
-    //
-    if (tip.isFailed()) {
+
+    // We do it once per TIP and that too for the task that fails the TIP
+    if (!wasFailed && tip.isFailed()) {
       //
       // Allow upto 'mapFailuresPercent' of map tasks to fail or
       // 'reduceFailuresPercent' of reduce tasks to fail
@@ -1285,20 +1650,28 @@
     }
     
     cleanUpMetrics();
+    // free up the memory used by the data structures
+    this.nonRunningMapCache = null;
+    this.runningMapCache = null;
+    this.nonRunningReduces = null;
+    this.runningReduces = null;
   }
 
   /**
    * Return the TaskInProgress that matches the tipid.
    */
   public TaskInProgress getTaskInProgress(String tipid){
-    for (int i = 0; i < maps.length; i++) {
-      if (tipid.equals(maps[i].getTIPId())){
-        return maps[i];
-      }               
-    }
-    for (int i = 0; i < reduces.length; i++) {
-      if (tipid.equals(reduces[i].getTIPId())){
-        return reduces[i];
+    if (TaskInProgress.isMapId(tipid)) {
+      for (int i = 0; i < maps.length; i++) {
+        if (tipid.equals(maps[i].getTIPId())){
+          return maps[i];
+        }
+      }
+    } else {
+      for (int i = 0; i < reduces.length; i++) {
+        if (tipid.equals(reduces[i].getTIPId())){
+          return reduces[i];
+        }
       }
     }
     return null;

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=641610&r1=641609&r2=641610&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Mar 26 15:26:36 2008
@@ -37,6 +37,7 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.HashMap;
 import java.util.TreeSet;
 import java.util.Vector;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -86,7 +87,8 @@
   private DNSToSwitchMapping dnsToSwitchMapping;
   private NetworkTopology clusterMap = new NetworkTopology();
   private ResolutionThread resThread = new ResolutionThread();
-  private int numTaskCacheLevels; // the max level of a host in the network topology
+  private int numTaskCacheLevels; // the max level to which we cache tasks
+  private Set<Node> nodesAtMaxLevel = new HashSet<Node>();
   
   /**
    * A client tried to submit a job before the Job Tracker was ready.
@@ -556,8 +558,8 @@
   Map<String, HeartbeatResponse> trackerToHeartbeatResponseMap = 
     new TreeMap<String, HeartbeatResponse>();
 
-  // (trackerHostName --> Node (NetworkTopology))
-  Map<String, Node> trackerNameToNodeMap = 
+  // (hostname --> Node (NetworkTopology))
+  Map<String, Node> hostnameToNodeMap = 
     Collections.synchronizedMap(new TreeMap<String, Node>());
   
   // Number of resolved entries
@@ -1171,20 +1173,54 @@
     List <String> tmpList = new ArrayList<String>(1);
     tmpList.add(name);
     List <String> rNameList = dnsToSwitchMapping.resolve(tmpList);
-    if (rNameList == null || rNameList.size() == 0) {
-      return null;
-    }
     String rName = rNameList.get(0);
     String networkLoc = NodeBase.normalize(rName);
-    Node node = null;
-    if ((node = clusterMap.getNode(networkLoc+"/"+name)) == null) {
-      node = new NodeBase(name, networkLoc);
+    return addHostToNodeMapping(name, networkLoc);
+  }
+  
+  private Node addHostToNodeMapping(String host, String networkLoc) {
+    Node node;
+    if ((node = clusterMap.getNode(networkLoc+"/"+host)) == null) {
+      node = new NodeBase(host, networkLoc);
       clusterMap.add(node);
+      if (node.getLevel() < getNumTaskCacheLevels()) {
+        LOG.fatal("Got a host whose level is: " + node.getLevel() + "." 
+                  + " Should get at least a level of value: " 
+                  + getNumTaskCacheLevels());
+        try {
+          stopTracker();
+        } catch (IOException ie) {
+          LOG.warn("Exception encountered during shutdown: " 
+                   + StringUtils.stringifyException(ie));
+          System.exit(-1);
+        }
+      }
+      hostnameToNodeMap.put(host, node);
+      // Make an entry for the node at the max level in the cache
+      nodesAtMaxLevel.add(getParentNode(node, getNumTaskCacheLevels() - 1));
+    }
+    return node;
+  }
+
+  /**
+   * Returns a collection of nodes at the max level
+   */
+  public Collection<Node> getNodesAtMaxLevel() {
+    return nodesAtMaxLevel;
+  }
+
+  public static Node getParentNode(Node node, int level) {
+    for (int i = 0; i < level; ++i) {
+      node = node.getParent();
     }
     return node;
   }
+
+  /**
+   * Return the Node in the network topology that corresponds to the hostname
+   */
   public Node getNode(String name) {
-    return trackerNameToNodeMap.get(name);
+    return hostnameToNodeMap.get(name);
   }
   public int getNumTaskCacheLevels() {
     return numTaskCacheLevels;
@@ -1421,15 +1457,12 @@
     public void run() {
       while (!isInterrupted()) {
         try {
-          int size;
-          if((size = queue.size()) == 0) {
-            Thread.sleep(1000);
-            continue;
-          }
           List <TaskTrackerStatus> statuses = 
-            new ArrayList<TaskTrackerStatus>(size);
+            new ArrayList<TaskTrackerStatus>(queue.size());
+          // Block if the queue is empty
+          statuses.add(queue.take());
           queue.drainTo(statuses);
-          List<String> dnHosts = new ArrayList<String>(size);
+          List<String> dnHosts = new ArrayList<String>(statuses.size());
           for (TaskTrackerStatus t : statuses) {
             dnHosts.add(t.getHost());
           }
@@ -1441,12 +1474,7 @@
           for (String m : rName) {
             String host = statuses.get(i++).getHost();
             String networkLoc = NodeBase.normalize(m);
-            Node node = null;
-            if (clusterMap.getNode(networkLoc+"/"+host) == null) {
-              node = new NodeBase(host, networkLoc);
-              clusterMap.add(node);
-              trackerNameToNodeMap.put(host, node);
-            }
+            addHostToNodeMapping(host, networkLoc);
             numResolved++;
           }
         } catch (InterruptedException ie) {
@@ -2102,113 +2130,155 @@
     }
     
     public void addToQueue(JobInProgress.JobWithTaskContext j) {
-      while (!queue.add(j)) {
-        LOG.warn("Couldn't add to the Task Commit queue now. Will " +
-                 "try again");
+      while (true) { // loop until the element gets added
         try {
-          Thread.sleep(2000);
+          queue.put(j);
+          return;
         } catch (InterruptedException ie) {}
       }
     }
        
     public void run() {
+      int  batchCommitSize = conf.getInt("jobtracker.task.commit.batch.size", 
+                                         5000); 
       while (!isInterrupted()) {
         try {
-          JobInProgress.JobWithTaskContext j = queue.take();
-          JobInProgress job = j.getJob();
-          TaskInProgress tip = j.getTIP();
-          String taskid = j.getTaskId();
-          JobTrackerMetrics metrics = j.getJobTrackerMetrics();
-          Task t;
-          TaskStatus status;
-          boolean isTipComplete = false;
-          TaskStatus.State state;
+          ArrayList <JobInProgress.JobWithTaskContext> jobList = 
+            new ArrayList<JobInProgress.JobWithTaskContext>(batchCommitSize);
+          // Block if the queue is empty
+          jobList.add(queue.take());  
+          queue.drainTo(jobList, batchCommitSize);
+
+          JobInProgress[] jobs = new JobInProgress[jobList.size()];
+          TaskInProgress[] tips = new TaskInProgress[jobList.size()];
+          String[] taskids = new String[jobList.size()];
+          JobTrackerMetrics[] metrics = new JobTrackerMetrics[jobList.size()];
+
+          Iterator<JobInProgress.JobWithTaskContext> iter = jobList.iterator();
+          int count = 0;
+
+          while (iter.hasNext()) {
+            JobInProgress.JobWithTaskContext j = iter.next();
+            jobs[count] = j.getJob();
+            tips[count] = j.getTIP();
+            taskids[count]= j.getTaskId();
+            metrics[count] = j.getJobTrackerMetrics();
+            ++count;
+          }
+
+          Task[] tasks = new Task[jobList.size()];
+          TaskStatus[] status = new TaskStatus[jobList.size()];
+          boolean[] isTipComplete = new boolean[jobList.size()];
+          TaskStatus.State[] states = new TaskStatus.State[jobList.size()];
+
           synchronized (JobTracker.this) {
-            synchronized (job) {
-              synchronized (tip) {
-                status = tip.getTaskStatus(taskid);
-                t = tip.getTaskObject(taskid);
-                state = status.getRunState();
-                isTipComplete = tip.isComplete();
+            for(int i = 0; i < jobList.size(); ++i) {
+              synchronized (jobs[i]) {
+                synchronized (tips[i]) {
+                  status[i] = tips[i].getTaskStatus(taskids[i]);
+                  tasks[i] = tips[i].getTaskObject(taskids[i]);
+                  states[i] = status[i].getRunState();
+                  isTipComplete[i] = tips[i].isComplete();
+                }
               }
             }
           }
-          try {
-            //For COMMIT_PENDING tasks, we save the task output in the dfs
-            //as well as manipulate the JT datastructures to reflect a
-            //successful task. This guarantees that we don't declare a task
-            //as having succeeded until we have successfully completed the
-            //dfs operations.
-            //For failed tasks, we just do the dfs operations here. The
-            //datastructures updates is done earlier as soon as the failure
-            //is detected so that the JT can immediately schedule another
-            //attempt for that task.
-            if (state == TaskStatus.State.COMMIT_PENDING) {
-              if (!isTipComplete) {
-                t.saveTaskOutput();
+
+          //For COMMIT_PENDING tasks, we save the task output in the dfs
+          //as well as manipulate the JT datastructures to reflect a
+          //successful task. This guarantees that we don't declare a task
+          //as having succeeded until we have successfully completed the
+          //dfs operations.
+          //For failed tasks, we just do the dfs operations here. The
+          //datastructures updates is done earlier as soon as the failure
+          //is detected so that the JT can immediately schedule another
+          //attempt for that task.
+
+          Set<String> seenTIPs = new HashSet<String>();
+          for(int index = 0; index < jobList.size(); ++index) {
+            try {
+              if (states[index] == TaskStatus.State.COMMIT_PENDING) {
+                if (!isTipComplete[index]) {
+                  if (!seenTIPs.contains(tips[index].getTIPId())) {
+                    tasks[index].saveTaskOutput();
+                    seenTIPs.add(tips[index].getTIPId());
+                  } else {
+                    // since other task of this tip has saved its output
+                    isTipComplete[index] = true;
+                  }
+                }
               }
+            } catch (IOException ioe) {
+              // Oops! Failed to copy the task's output to its final place;
+              // fail the task!
+              states[index] = TaskStatus.State.FAILED;
               synchronized (JobTracker.this) {
-                //do a check for the case where after the task went to
-                //COMMIT_PENDING, it was lost. So although we would have
-                //saved the task output, we cannot declare it a SUCCESS.
-                TaskStatus newStatus = null;
-                synchronized (job) {
-                  synchronized (tip) {
-                    status = tip.getTaskStatus(taskid);
-                    if (!isTipComplete) {
-                      if (status.getRunState() != 
-                        TaskStatus.State.COMMIT_PENDING) {
-                        state = TaskStatus.State.KILLED;
+                String reason = "Failed to rename output with the exception: " 
+                                + StringUtils.stringifyException(ioe);
+                TaskStatus.Phase phase = (tips[index].isMapTask() 
+                                          ? TaskStatus.Phase.MAP 
+                                          : TaskStatus.Phase.REDUCE);
+                jobs[index].failedTask(tips[index], status[index].getTaskId(), 
+                                       reason, phase, TaskStatus.State.FAILED, 
+                                       status[index].getTaskTracker(), null);
+              }
+              LOG.info("Failed to rename the output of " 
+                       + status[index].getTaskId() + " with " 
+                       + StringUtils.stringifyException(ioe));
+            }
+          }
+
+          synchronized (JobTracker.this) {
+            //do a check for the case where after the task went to
+            //COMMIT_PENDING, it was lost. So although we would have
+            //saved the task output, we cannot declare it a SUCCESS.
+            for(int i = 0; i < jobList.size(); ++i) {
+              TaskStatus newStatus = null;
+              if(states[i] == TaskStatus.State.COMMIT_PENDING) {
+                synchronized (jobs[i]) {
+                  synchronized (tips[i]) {
+                    status[i] = tips[i].getTaskStatus(taskids[i]);
+                    if (!isTipComplete[i]) {
+                      if (status[i].getRunState() 
+                          != TaskStatus.State.COMMIT_PENDING) {
+                        states[i] = TaskStatus.State.KILLED;
                       } else {
-                        state = TaskStatus.State.SUCCEEDED;
+                        states[i] = TaskStatus.State.SUCCEEDED;
                       }
                     } else {
-                      tip.addDiagnosticInfo(t.getTaskId(),"Already completed " +
-                      "TIP");
-                      state = TaskStatus.State.KILLED;
-
-                    }
-                    //create new status if required. If the state changed from
-                    //COMMIT_PENDING to KILLED in the JobTracker, while we were
-                    //saving the output,the JT would have called updateTaskStatus
-                    //and we don't need to call it again
-                    if (status.getRunState() == TaskStatus.State.COMMIT_PENDING){
-                      newStatus = (TaskStatus)status.clone();
-                      newStatus.setRunState(state);
-                      newStatus.setProgress((state == TaskStatus.State.SUCCEEDED) ? 1.0f : 0.0f);
+                      tips[i].addDiagnosticInfo(tasks[i].getTaskId(), 
+                                                "Already completed  TIP");
+                      states[i] = TaskStatus.State.KILLED;
                     }
+                    //create new status if required. If the state changed 
+                    //from COMMIT_PENDING to KILLED in the JobTracker, while 
+                    //we were saving the output,the JT would have called 
+                    //updateTaskStatus and we don't need to call it again
+                    newStatus = (TaskStatus)status[i].clone();
+                    newStatus.setRunState(states[i]);
+                    newStatus.setProgress(
+                        (states[i] == TaskStatus.State.SUCCEEDED) 
+                        ? 1.0f 
+                        : 0.0f);
                   }
                   if (newStatus != null) {
-                    job.updateTaskStatus(tip, newStatus, metrics);
+                    jobs[i].updateTaskStatus(tips[i], newStatus, metrics[i]);
                   }
                 }
               }
             }
-          } catch (IOException ioe) {
-            // Oops! Failed to copy the task's output to its final place;
-            // fail the task!
-            state = TaskStatus.State.FAILED;
-            synchronized (JobTracker.this) {
-              job.failedTask(tip, status.getTaskId(), 
-                  "Failed to rename output with the exception: " + 
-                  StringUtils.stringifyException(ioe), 
-                  (tip.isMapTask() ? 
-                      TaskStatus.Phase.MAP : 
-                        TaskStatus.Phase.REDUCE), 
-                        TaskStatus.State.FAILED,  
-                        status.getTaskTracker(), null);
-            }
-            LOG.info("Failed to rename the output of " + status.getTaskId() + 
-                " with: " + StringUtils.stringifyException(ioe));
           }
-          if (state == TaskStatus.State.FAILED || 
-              state == TaskStatus.State.KILLED) {
-            try {
-              t.discardTaskOutput();
-            } catch (IOException ioe) { 
-              LOG.info("Failed to discard the output of task " + 
-                  status.getTaskId() + " with: " + 
-                  StringUtils.stringifyException(ioe));
+
+          for(int i = 0; i < jobList.size(); ++i) {
+            if (states[i] == TaskStatus.State.FAILED
+                || states[i] == TaskStatus.State.KILLED) {
+              try {
+                tasks[i].discardTaskOutput();
+              } catch (IOException ioe) {
+                LOG.info("Failed to discard the output of task " 
+                         + status[i].getTaskId() + " with: " 
+                         + StringUtils.stringifyException(ioe));
+              }
             }
           }
         } catch (InterruptedException ie) {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=641610&r1=641609&r2=641610&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Mar 26 15:26:36 2008
@@ -53,6 +53,8 @@
   int maxTaskAttempts = 4;    
   static final double SPECULATIVE_GAP = 0.2;
   static final long SPECULATIVE_LAG = 60 * 1000;
+  static final String MAP_IDENTIFIER = "_m_";
+  static final String REDUCE_IDENTIFIER = "_r_";
   private static NumberFormat idFormat = NumberFormat.getInstance();
   static {
     idFormat.setMinimumIntegerDigits(6);
@@ -160,6 +162,18 @@
       this.maxTaskAttempts = conf.getMaxReduceAttempts();
     }
   }
+  
+  /**
+   * Return true if the tip id represents a map
+   * @param tipId the tip id
+   * @return whether the tip is a map tip or a reduce tip
+   */
+  public static boolean isMapId(String tipId) {
+    if (tipId.contains(MAP_IDENTIFIER))  {
+      return true;
+    }
+    return false;
+  }
 
   /**
    * Make a unique name for this TIP.
@@ -170,9 +184,9 @@
     StringBuilder result = new StringBuilder();
     result.append(uniqueBase);
     if (isMapTask()) {
-      result.append("_m_");
+      result.append(MAP_IDENTIFIER);
     } else {
-      result.append("_r_");
+      result.append(REDUCE_IDENTIFIER);
     }
     result.append(idFormat.format(partition));
     return result.toString();



Mime
View raw message