hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tgra...@apache.org
Subject svn commit: r1363456 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/...
Date Thu, 19 Jul 2012 18:22:53 GMT
Author: tgraves
Date: Thu Jul 19 18:22:53 2012
New Revision: 1363456

URL: http://svn.apache.org/viewvc?rev=1363456&view=rev
Log:
merge -r 1363454:1363455 from branch-2. FIXES: MAPREDUCE-4423

Added:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
      - copied unchanged from r1363455, hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1363456&r1=1363455&r2=1363456&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Thu Jul 19 18:22:53
2012
@@ -343,6 +343,9 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-4448. Fix NM crash during app cleanup if aggregation didn't
     init. (Jason Lowe via daryn)
 
+    MAPREDUCE-4423. Potential infinite fetching of map output (Robert Evans
+    via tgraves)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1363456&r1=1363455&r2=1363456&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
Thu Jul 19 18:22:53 2012
@@ -49,7 +49,8 @@ import org.apache.hadoop.mapreduce.task.
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 
-@SuppressWarnings({"deprecation"})
+import com.google.common.annotations.VisibleForTesting;
+
 class Fetcher<K,V> extends Thread {
   
   private static final Log LOG = LogFactory.getLog(Fetcher.class);
@@ -175,13 +176,18 @@ class Fetcher<K,V> extends Thread {
     }
   }
 
+  @VisibleForTesting
+  protected HttpURLConnection openConnection(URL url) throws IOException {
+    return (HttpURLConnection)url.openConnection();
+  }
+  
   /**
    * The crux of the matter...
    * 
    * @param host {@link MapHost} from which we need to  
    *              shuffle available map-outputs.
    */
-  private void copyFromHost(MapHost host) throws IOException {
+  protected void copyFromHost(MapHost host) throws IOException {
     // Get completed maps on 'host'
     List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
     
@@ -191,9 +197,11 @@ class Fetcher<K,V> extends Thread {
       return;
     }
     
-    LOG.debug("Fetcher " + id + " going to fetch from " + host);
-    for (TaskAttemptID tmp: maps) {
-      LOG.debug(tmp);
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Fetcher " + id + " going to fetch from " + host);
+      for (TaskAttemptID tmp: maps) {
+        LOG.debug(tmp);
+      }
     }
     
     // List of maps to be fetched yet
@@ -205,7 +213,7 @@ class Fetcher<K,V> extends Thread {
     
     try {
       URL url = getMapOutputURL(host, maps);
-      HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+      HttpURLConnection connection = openConnection(url);
       
       // generate hash of the url
       String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
@@ -266,17 +274,24 @@ class Fetcher<K,V> extends Thread {
     
     try {
       // Loop through available map-outputs and fetch them
-      // On any error, good becomes false and we exit after putting back
-      // the remaining maps to the yet_to_be_fetched list
-      boolean good = true;
-      while (!remaining.isEmpty() && good) {
-        good = copyMapOutput(host, input, remaining);
+      // On any error, faildTasks is not null and we exit
+      // after putting back the remaining maps to the 
+      // yet_to_be_fetched list and marking the failed tasks.
+      TaskAttemptID[] failedTasks = null;
+      while (!remaining.isEmpty() && failedTasks == null) {
+        failedTasks = copyMapOutput(host, input, remaining);
+      }
+      
+      if(failedTasks != null) {
+        for(TaskAttemptID left: failedTasks) {
+          scheduler.copyFailed(left, host, true);
+        }
       }
       
       IOUtils.cleanup(LOG, input);
       
       // Sanity check
-      if (good && !remaining.isEmpty()) {
+      if (failedTasks == null && !remaining.isEmpty()) {
         throw new IOException("server didn't return all expected map outputs: "
             + remaining.size() + " left.");
       }
@@ -285,10 +300,9 @@ class Fetcher<K,V> extends Thread {
         scheduler.putBackKnownMapOutput(host, left);
       }
     }
-      
-   }
+  }
   
-  private boolean copyMapOutput(MapHost host,
+  private TaskAttemptID[] copyMapOutput(MapHost host,
                                 DataInputStream input,
                                 Set<TaskAttemptID> remaining) {
     MapOutput<K,V> mapOutput = null;
@@ -310,14 +324,15 @@ class Fetcher<K,V> extends Thread {
       } catch (IllegalArgumentException e) {
         badIdErrs.increment(1);
         LOG.warn("Invalid map id ", e);
-        return false;
+        //Don't know which one was bad, so consider all of them as bad
+        return remaining.toArray(new TaskAttemptID[remaining.size()]);
       }
 
  
       // Do some basic sanity verification
       if (!verifySanity(compressedLength, decompressedLength, forReduce,
           remaining, mapId)) {
-        return false;
+        return new TaskAttemptID[] {mapId};
       }
       
       LOG.debug("header: " + mapId + ", len: " + compressedLength + 
@@ -329,7 +344,7 @@ class Fetcher<K,V> extends Thread {
       // Check if we can shuffle *now* ...
       if (mapOutput.getType() == Type.WAIT) {
         LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
-        return false;
+        return new TaskAttemptID[] {mapId};
       } 
       
       // Go!
@@ -351,14 +366,18 @@ class Fetcher<K,V> extends Thread {
       // Note successful shuffle
       remaining.remove(mapId);
       metrics.successFetch();
-      return true;
+      return null;
     } catch (IOException ioe) {
       ioErrs.increment(1);
       if (mapId == null || mapOutput == null) {
         LOG.info("fetcher#" + id + " failed to read map header" + 
                  mapId + " decomp: " + 
                  decompressedLength + ", " + compressedLength, ioe);
-        return false;
+        if(mapId == null) {
+          return remaining.toArray(new TaskAttemptID[remaining.size()]);
+        } else {
+          return new TaskAttemptID[] {mapId};
+        }
       }
       
       LOG.info("Failed to shuffle output of " + mapId + 
@@ -366,9 +385,8 @@ class Fetcher<K,V> extends Thread {
 
       // Inform the shuffle-scheduler
       mapOutput.abort();
-      scheduler.copyFailed(mapId, host, true);
       metrics.failedFetch();
-      return false;
+      return new TaskAttemptID[] {mapId};
     }
 
   }



Mime
View raw message