hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r794637 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/MRConstants.java src/java/org/apache/hadoop/mapred/ReduceTask.java src/java/org/apache/hadoop/mapred/TaskTracker.java
Date Thu, 16 Jul 2009 11:47:02 GMT
Author: ddas
Date: Thu Jul 16 11:47:01 2009
New Revision: 794637

URL: http://svn.apache.org/viewvc?rev=794637&view=rev
Log:
MAPREDUCE-18. Puts some checks for cross checking whether a reduce task gets the correct shuffle
data. Re-committed the same patch after talking to Ravi.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=794637&r1=794636&r2=794637&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jul 16 11:47:01 2009
@@ -214,3 +214,7 @@
     
     MAPREDUCE-680. Fix so MRUnit can handle reuse of Writable objects.
     (Aaron Kimball via johan)
+
+    MAPREDUCE-18. Puts some checks for cross checking whether a reduce
+    task gets the correct shuffle data. (Ravi Gummadi via ddas)
+

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java?rev=794637&r1=794636&r2=794637&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java Thu Jul 16 11:47:01
2009
@@ -45,5 +45,15 @@
    */
   public static final String RAW_MAP_OUTPUT_LENGTH = "Raw-Map-Output-Length";
 
+  /**
+   * The map task from which the map output data is being transferred
+   */
+  public static final String FROM_MAP_TASK = "from-map-task";
+  
+  /**
+   * The reduce task number for which this map output is being transferred
+   */
+  public static final String FOR_REDUCE_TASK = "for-reduce-task";
+  
   public static final String WORKDIR = "work";
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=794637&r1=794636&r2=794637&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Jul 16 11:47:01
2009
@@ -1294,7 +1294,8 @@
         Path tmpMapOutput = new Path(filename+"-"+id);
         
         // Copy the map output
-        MapOutput mapOutput = getMapOutput(loc, tmpMapOutput);
+        MapOutput mapOutput = getMapOutput(loc, tmpMapOutput,
+                                           reduceId.getTaskID().getId());
         if (mapOutput == null) {
           throw new IOException("Failed to fetch map-output for " + 
                                 loc.getTaskAttemptId() + " from " + 
@@ -1375,7 +1376,7 @@
        * @throws IOException when something goes wrong
        */
       private MapOutput getMapOutput(MapOutputLocation mapOutputLoc, 
-                                     Path filename)
+                                     Path filename, int reduce)
       throws IOException, InterruptedException {
         // Connect
         URLConnection connection = 
@@ -1383,16 +1384,52 @@
         InputStream input = getInputStream(connection, shuffleConnectionTimeout,
                                            shuffleReadTimeout); 
 
+        // Validate header from map output
+        TaskAttemptID mapId = null;
+        try {
+          mapId =
+            TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK));
+        } catch (IllegalArgumentException ia) {
+          LOG.warn("Invalid map id ", ia);
+          return null;
+        }
+        TaskAttemptID expectedMapId = mapOutputLoc.getTaskAttemptId();
+        if (!mapId.equals(expectedMapId)) {
+          LOG.warn("data from wrong map:" + mapId +
+              " arrived to reduce task " + reduce +
+              ", where as expected map output should be from " + expectedMapId);
+          return null;
+        }
+
+        long decompressedLength =
+          Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));
+        long compressedLength =
+          Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
+
+        if (compressedLength < 0 || decompressedLength < 0) {
+          LOG.warn(getName() + " invalid lengths in map output header: id: " +
+              mapId + " compressed len: " + compressedLength +
+              ", decompressed len: " + decompressedLength);
+          return null;
+        }
+        int forReduce =
+          (int)Integer.parseInt(connection.getHeaderField(FOR_REDUCE_TASK));
+
+        if (forReduce != reduce) {
+          LOG.warn("data for the wrong reduce: " + forReduce +
+              " with compressed len: " + compressedLength +
+              ", decompressed len: " + decompressedLength +
+              " arrived to reduce task " + reduce);
+          return null;
+        }
+        LOG.info("header: " + mapId + ", compressed len: " + compressedLength +
+            ", decompressed len: " + decompressedLength);
+
         //We will put a file in memory if it meets certain criteria:
         //1. The size of the (decompressed) file should be less than 25% of 
         //    the total inmem fs
         //2. There is space available in the inmem fs
 
-        long decompressedLength = 
-          Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));  
-        long compressedLength = 
-          Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
-
         // Check if this map-output can be saved in-memory
         boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength); 
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=794637&r1=794636&r2=794637&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Jul 16 11:47:01
2009
@@ -2926,9 +2926,13 @@
          * Read the index file to get the information about where
          * the map-output for the given reducer is available. 
          */
-       IndexRecord info = 
+        IndexRecord info = 
           tracker.indexCache.getIndexInformation(mapId, reduce,indexFileName);
           
+        //set the custom "from-map-task" http header to the map task from which
+        //the map output data is being transferred
+        response.setHeader(FROM_MAP_TASK, mapId);
+        
         //set the custom "Raw-Map-Output-Length" http header to 
         //the raw (decompressed) length
         response.setHeader(RAW_MAP_OUTPUT_LENGTH,
@@ -2939,6 +2943,10 @@
         response.setHeader(MAP_OUTPUT_LENGTH,
             Long.toString(info.partLength));
 
+        //set the custom "for-reduce-task" http header to the reduce task number
+        //for which this map output is being transferred
+        response.setHeader(FOR_REDUCE_TASK, Integer.toString(reduce));
+        
         //use the same buffersize as used for reading the data from disk
         response.setBufferSize(MAX_BYTES_TO_READ);
         



Mime
View raw message