hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r504640 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/TaskTracker.java
Date Wed, 07 Feb 2007 18:36:41 GMT
Author: cutting
Date: Wed Feb  7 10:36:40 2007
New Revision: 504640

URL: http://svn.apache.org/viewvc?view=rev&rev=504640
Log:
HADOOP-984.  Fix a bug in shuffle error handling introduced by HADOOP-331.  Contributed by
Arun.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=504640&r1=504639&r2=504640
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Feb  7 10:36:40 2007
@@ -8,6 +8,10 @@
  2. HADOOP-917.  Fix a NullPointerException in SequenceFile's merger
     with large map outputs.  (omalley via cutting)
 
+ 3. HADOOP-984.  Fix a bug in shuffle error handling introduced by
+    HADOOP-331.  If a map output is unavailable, the job tracker is
+    once more informed.  (Arun C Murthy via cutting)
+
 
 Release 0.11.0 - 2007-02-02
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=504640&r1=504639&r2=504640
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Feb  7 10:36:40
2007
@@ -1526,30 +1526,47 @@
         JobConf conf = (JobConf) context.getAttribute("conf");
         FileSystem fileSys = 
           (FileSystem) context.getAttribute("local.file.system");
-        //open index file
+
+        // Index file
         Path indexFileName = conf.getLocalPath(mapId+"/file.out.index");
-        FSDataInputStream in = fileSys.open(indexFileName);
-        //seek to the correct offset for the given reduce
-        in.seek(reduce * 16);
-        
-        //read the offset and length of the partition data
-        long startOffset = in.readLong();
-        long partLength = in.readLong();
-        
-        in.close();
+        FSDataInputStream indexIn = null;
          
+        // Map-output file
         Path mapOutputFileName = conf.getLocalPath(mapId+"/file.out"); 
-           
-        response.setContentLength((int) partLength);
-        FSDataInputStream inStream = null;
+        FSDataInputStream mapOutputIn = null;
+        
         // true iff IOException was caused by attempt to access input
         boolean isInputException = true;
+        
         try {
-          inStream = fileSys.open(mapOutputFileName);
-          inStream.seek(startOffset);
+          /**
+           * Read the index file to get the information about where
+           * the map-output for the given reducer is available. 
+           */
+          //open index file
+          indexIn = fileSys.open(indexFileName);
+
+          //seek to the correct offset for the given reduce
+          indexIn.seek(reduce * 16);
+          
+          //read the offset and length of the partition data
+          long startOffset = indexIn.readLong();
+          long partLength = indexIn.readLong();
+
+          //set the content-length header
+          response.setContentLength((int) partLength);
+
+          /**
+           * Read the data from the sigle map-output file and
+           * send it to the reducer.
+           */
+          //open the map-output file
+          mapOutputIn = fileSys.open(mapOutputFileName);
+          //seek to the correct offset for the reduce
+          mapOutputIn.seek(startOffset);
           try {
             int totalRead = 0;
-            int len = inStream.read(buffer, 0,
+            int len = mapOutputIn.read(buffer, 0,
                                  partLength < MAX_BYTES_TO_READ 
                                  ? (int)partLength : MAX_BYTES_TO_READ);
             while (len > 0) {
@@ -1561,12 +1578,17 @@
               }
               totalRead += len;
               if (totalRead == partLength) break;
-              len = inStream.read(buffer, 0, 
+              len = mapOutputIn.read(buffer, 0, 
                       (partLength - totalRead) < MAX_BYTES_TO_READ
                        ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
             }
           } finally {
-            inStream.close();
+            if (indexIn != null) {
+              indexIn.close();
+            }
+            if (mapOutputIn != null) {
+              mapOutputIn.close();
+            }
           }
         } catch (IOException ie) {
           TaskTracker tracker = 



Mime
View raw message