hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r512461 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
Date Tue, 27 Feb 2007 22:35:25 GMT
Author: cutting
Date: Tue Feb 27 14:35:24 2007
New Revision: 512461

URL: http://svn.apache.org/viewvc?view=rev&rev=512461
Log:
HADOOP-1042.  Improve the handling of failed map output fetches.  Contributed by Devaraj.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=512461&r1=512460&r2=512461
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Feb 27 14:35:24 2007
@@ -147,6 +147,9 @@
 43. HADOOP-1036.  Fix exception handling in TaskTracker to keep tasks
     from being lost.  (Arun C Murthy via cutting)
 
+44. HADOOP-1042.  Improve the handling of failed map output fetches.
+    (Devaraj Das via cutting)
+
 
 Release 0.11.2 - 2007-02-16
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=512461&r1=512460&r2=512461
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Tue Feb 27
14:35:24 2007
@@ -140,11 +140,6 @@
   private int probe_sample_size = 50;
   
   /**
-   * a Random used during the map output fetching
-   */
-  private Random randForProbing;
-  
-  /**
    * a hashmap from mapId to MapOutputLocation for retrials
    */
   private Map<Integer, MapOutputLocation> retryFetches = new HashMap();
@@ -430,7 +425,8 @@
     
     final int      numOutputs = reduceTask.getNumMaps();
     List           neededOutputs = new ArrayList(numOutputs);
-    List           knownOutputs = new ArrayList(100);
+    Map<Integer, MapOutputLocation> knownOutputs = 
+                                    new HashMap<Integer, MapOutputLocation>();
     int            numInFlight = 0, numCopied = 0;
     int            lowThreshold = numCopiers*2;
     long           bytesTransferred = 0;
@@ -440,7 +436,6 @@
     
     //tweak the probe sample size (make it a function of numCopiers)
     probe_sample_size = Math.max(numCopiers*5, 50);
-    randForProbing = new Random(reduceTask.getPartition() * 100);
     
     for (int i = 0; i < numOutputs; i++) {
       neededOutputs.add(new Integer(i));
@@ -477,35 +472,38 @@
         LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
                  " map output location(s)");
         try {
+          // Put the hash entries for the failed fetches. Entries here
+          // might be replaced by (mapId) hashkeys from new successful 
+          // Map executions, if the fetch failures were due to lost tasks.
+          // The replacements, if at all, will happen when we query the
+          // JobTracker and put the mapId hashkeys with new MapOutputLocations
+          // as values
+          knownOutputs.putAll(retryFetches);
           // the call to queryJobTracker will modify fromEventId to a value
           // that it should be for the next call to queryJobTracker
-          MapOutputLocation[] locs = queryJobTracker(fromEventId, jobClient);
+          List <MapOutputLocation> locs = queryJobTracker(fromEventId, 
+                                                          jobClient);
           
           // remove discovered outputs from needed list
           // and put them on the known list
-          int gotLocs = (locs == null ? 0 : locs.length);
-          for (int i=0; i < locs.length; i++) {
+          int gotLocs = (locs == null ? 0 : locs.size());
+          for (int i=0; i < locs.size(); i++) {
             // check whether we actually need an output. It could happen
             // that a map task that successfully ran earlier got lost, but
             // if we already have copied the output of that unfortunate task
             // we need not copy it again from the new TT (we will ignore 
             // the event for the new rescheduled execution)
-            if(neededOutputs.remove(new Integer(locs[i].getMapId()))) {
-              // remove the mapId from the retryFetches hashmap since we now
-              // prefer the new location instead of what we saved earlier
-              retryFetches.remove(new Integer(locs[i].getMapId()));
-              knownOutputs.add(locs[i]);
+            if(neededOutputs.remove(new Integer(locs.get(i).getMapId()))) {
+              knownOutputs.put(new Integer(locs.get(i).getMapId()), locs.get(i));
             }
             else gotLocs--; //we don't need this output
             
           }
-          // now put the remaining hash entries for the failed fetches 
-          // and clear the hashmap
-          knownOutputs.addAll(retryFetches.values());
           LOG.info(reduceTask.getTaskId() +
                 " Got " + gotLocs + 
                 " new map outputs from jobtracker and " + retryFetches.size() +
                 " map outputs from previous failures");
+          // clear the "failed" fetches hashmap
           retryFetches.clear();
         }
         catch (IOException ie) {
@@ -523,7 +521,7 @@
                " known map output location(s); scheduling...");
 
       synchronized (scheduledCopies) {
-        ListIterator locIt = knownOutputs.listIterator();
+        Iterator locIt = knownOutputs.values().iterator();
 
         currentTime = System.currentTimeMillis();
         while (locIt.hasNext()) {
@@ -596,7 +594,7 @@
             // the failure is due to a lost tasktracker (causes many
             // unnecessary backoffs). If not, we only take a small hit
             // polling the jobtracker a few more times
-            ListIterator locIt = knownOutputs.listIterator();
+            Iterator locIt = knownOutputs.values().iterator();
             while (locIt.hasNext()) {
               MapOutputLocation loc = (MapOutputLocation)locIt.next();
               if (cr.getHost().equals(loc.getHost())) {
@@ -716,7 +714,7 @@
    * @return a set of locations to copy outputs from
    * @throws IOException
    */  
-  private MapOutputLocation[] queryJobTracker(IntWritable fromEventId, 
+  private List <MapOutputLocation> queryJobTracker(IntWritable fromEventId, 
                                               InterTrackerProtocol jobClient)
   throws IOException {
     
@@ -747,11 +745,8 @@
         mapOutputsList.add(new MapOutputLocation(taskId, mId, host, port));
       }
     }
-    Collections.shuffle(mapOutputsList, randForProbing);
-    MapOutputLocation[] locations =
-                        new MapOutputLocation[mapOutputsList.size()];
     fromEventId.set(fromEventId.get() + t.length);
-    return mapOutputsList.toArray(locations);
+    return mapOutputsList;
   }
 
   



Mime
View raw message