hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r515840 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
Date Wed, 07 Mar 2007 23:13:22 GMT
Author: cutting
Date: Wed Mar  7 15:13:21 2007
New Revision: 515840

URL: http://svn.apache.org/viewvc?view=rev&rev=515840
Log:
HADOOP-1077.  Fix a race condition fetching map outputs that could hang reduces.  Contributed
by Devaraj.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=515840&r1=515839&r2=515840
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Mar  7 15:13:21 2007
@@ -18,6 +18,8 @@
  4. HADOOP-1060.  Fix an IndexOutOfBoundsException in the JobTracker
     that could cause jobs to hang.  (Arun C Murthy via cutting)
 
+ 5. HADOOP-1077.  Fix a race condition fetching map outputs that could
+    hang reduces.  (Devaraj Das via cutting)
 
 Release 0.12.0 - 2007-03-02
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=515840&r1=515839&r2=515840
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Wed Mar  7
15:13:21 2007
@@ -144,6 +144,12 @@
    */
   private Map<Integer, MapOutputLocation> retryFetches = new HashMap();
   
+  /** 
+   * a TreeSet for needed map outputs
+   */
+  private Set <Integer> neededOutputs = 
+    Collections.synchronizedSet(new TreeSet<Integer>());
+  
   /** Represents the result of an attempt to copy a map output */
   private class CopyResult {
     
@@ -152,7 +158,10 @@
     
     // the size of the file copied, -1 if the transfer failed
     private final long size;
-        
+    
+    //a flag signifying whether a copy result is obsolete
+    private static final int OBSOLETE = -2;
+    
     CopyResult(MapOutputLocation loc, long size) {
       this.loc = loc;
       this.size = size;
@@ -160,6 +169,9 @@
     
     public int getMapId() { return loc.getMapId(); }
     public boolean getSuccess() { return size >= 0; }
+    public boolean isObsolete() { 
+      return size == OBSOLETE;
+    }
     public long getSize() { return size; }
     public String getHost() { return loc.getHost(); }
     public MapOutputLocation getLocation() { return loc; }
@@ -284,7 +296,9 @@
      */
     private long copyOutput(MapOutputLocation loc
                             ) throws IOException, InterruptedException {
-
+      if (!neededOutputs.contains(loc.getMapId())) {
+        return CopyResult.OBSOLETE;
+      }
       String reduceId = reduceTask.getTaskId();
       LOG.info(reduceId + " Copying " + loc.getMapTaskId() +
                " output from " + loc.getHost() + ".");
@@ -297,16 +311,28 @@
       tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics,
                                tmpFilename, reduceTask.getPartition(),
                                STALLED_COPY_TIMEOUT);
+      if (!neededOutputs.contains(loc.getMapId())) {
+        if (tmpFilename != null) {
+          FileSystem fs = tmpFilename.getFileSystem(conf);
+          fs.delete(tmpFilename);
+        }
+        return CopyResult.OBSOLETE;
+      }
       if (tmpFilename == null)
         throw new IOException("File " + finalFilename + "-" + id + 
                               " not created");
       long bytes = -1;
       // lock the ReduceTaskRunner while we do the rename
       synchronized (ReduceTaskRunner.this) {
-        // if we can't rename the file, something is broken (and IOException
-        // will be thrown). This file could have been created in the inmemory
+        // This file could have been created in the inmemory
         // fs or the localfs. So need to get the filesystem owning the path. 
         FileSystem fs = tmpFilename.getFileSystem(conf);
+        if (!neededOutputs.contains(loc.getMapId())) {
+          fs.delete(tmpFilename);
+          return CopyResult.OBSOLETE;
+        }
+        // if we can't rename the file, something is broken (and IOException
+        // will be thrown). 
         if (!fs.rename(tmpFilename, finalFilename)) {
           fs.delete(tmpFilename);
           throw new IOException("failure to rename map output " + tmpFilename);
@@ -332,6 +358,7 @@
           mergeInProgress = true;
           m.start();
         }
+        neededOutputs.remove(loc.getMapId());
       }
       return bytes;
     }
@@ -424,7 +451,6 @@
     this.mapOutputFile.removeAll(reduceTask.getTaskId());
     
     final int      numOutputs = reduceTask.getNumMaps();
-    List           neededOutputs = new ArrayList(numOutputs);
     Map<Integer, MapOutputLocation> knownOutputs = 
                                     new HashMap<Integer, MapOutputLocation>();
     int            numInFlight = 0, numCopied = 0;
@@ -484,23 +510,12 @@
           List <MapOutputLocation> locs = queryJobTracker(fromEventId, 
                                                           jobClient);
           
-          // remove discovered outputs from needed list
-          // and put them on the known list
-          int gotLocs = (locs == null ? 0 : locs.size());
+          // put discovered them on the known list
           for (int i=0; i < locs.size(); i++) {
-            // check whether we actually need an output. It could happen
-            // that a map task that successfully ran earlier got lost, but
-            // if we already have copied the output of that unfortunate task
-            // we need not copy it again from the new TT (we will ignore 
-            // the event for the new rescheduled execution)
-            if(neededOutputs.remove(new Integer(locs.get(i).getMapId()))) {
-              knownOutputs.put(new Integer(locs.get(i).getMapId()), locs.get(i));
-            }
-            else gotLocs--; //we don't need this output
-            
+            knownOutputs.put(new Integer(locs.get(i).getMapId()), locs.get(i));
           }
           LOG.info(reduceTask.getTaskId() +
-                " Got " + gotLocs + 
+                " Got " + locs.size() + 
                 " new map outputs from jobtracker and " + retryFetches.size() +
                 " map outputs from previous failures");
           // clear the "failed" fetches hashmap
@@ -575,9 +590,13 @@
             copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs + 
                                 " at " +
                                 mbpsFormat.format(transferRate) +  " MB/s)");          
+          } else if (cr.isObsolete()) {
+            //ignore
+            LOG.info(reduceTask.getTaskId() + 
+                " Ignoring obsolete copy result for Map Task: " + 
+                cr.getLocation().getMapTaskId() + " from host: " + 
+                cr.getHost());
           } else {
-            // this copy failed, put it back onto neededOutputs
-            neededOutputs.add(new Integer(cr.getMapId()));
             retryFetches.put(new Integer(cr.getMapId()), cr.getLocation());
           
             // wait a random amount of time for next contact
@@ -600,7 +619,6 @@
               if (cr.getHost().equals(loc.getHost())) {
                 retryFetches.put(new Integer(loc.getMapId()), loc);
                 locIt.remove();
-                neededOutputs.add(new Integer(loc.getMapId()));
               }
             }
           }



Mime
View raw message