hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r675991 - in /hadoop/core/branches/branch-0.18: ./ src/mapred/org/apache/hadoop/mapred/
Date Fri, 11 Jul 2008 15:33:14 GMT
Author: acmurthy
Date: Fri Jul 11 08:33:13 2008
New Revision: 675991

URL: http://svn.apache.org/viewvc?rev=675991&view=rev
Log:
Merge -r 675988:675989 from trunk to branch-0.18 to fix HADOOP-3647

Modified:
    hadoop/core/branches/branch-0.18/CHANGES.txt
    hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/IFile.java
    hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=675991&r1=675990&r2=675991&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Fri Jul 11 08:33:13 2008
@@ -744,6 +744,9 @@
     HADOOP-3718. Fix KFSOutputStream::write(int) to output a byte instead of
     an int, per the OutputStream contract. (Sriram Rao via cdouglas)
 
+    HADOOP-3647. Add debug logs to help track down a very occassional,
+    hard-to-reproduce, bug in shuffle/merge on the reducer. (acmurthy) 
+
 Release 0.17.2 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=675991&r1=675990&r2=675991&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/IFile.java Fri Jul
11 08:33:13 2008
@@ -18,6 +18,8 @@
 package org.apache.hadoop.mapred;
 
 import java.io.EOFException;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -45,7 +47,7 @@
  */
 class IFile {
 
-  private static int EOF_MARKER = -1;
+  private static final int EOF_MARKER = -1;
   
   /**
    * <code>IFile.Writer</code> to write out intermediate map-outputs. 
@@ -54,6 +56,7 @@
     FSDataOutputStream out;
     boolean ownOutputStream = false;
     long start = 0;
+    FSDataOutputStream rawOut;
     
     CompressionOutputStream compressedOut;
     Compressor compressor;
@@ -79,6 +82,9 @@
     public Writer(Configuration conf, FSDataOutputStream out, 
         Class<K> keyClass, Class<V> valueClass,
         CompressionCodec codec) throws IOException {
+      this.rawOut = out;
+      this.start = this.rawOut.getPos();
+      
       if (codec != null) {
         this.compressor = CodecPool.getCompressor(codec);
         this.compressor.reset();
@@ -88,7 +94,6 @@
       } else {
         this.out = out;
       }
-      this.start = this.out.getPos();
       
       this.keyClass = keyClass;
       this.valueClass = valueClass;
@@ -100,34 +105,34 @@
     }
     
     public void close() throws IOException {
+      // Close the serializers
+      keySerializer.close();
+      valueSerializer.close();
+
       // Write EOF_MARKER for key/value length
       WritableUtils.writeVInt(out, EOF_MARKER);
       WritableUtils.writeVInt(out, EOF_MARKER);
       decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
       
       if (compressOutput) {
-        // Return the compressor
+        // Flush data from buffers into the compressor
+        out.flush();
+        
+        // Flush & return the compressor
         compressedOut.finish();
         compressedOut.resetState();
         CodecPool.returnCompressor(compressor);
       }
-      
-      // Close the serializers
-      keySerializer.close();
-      valueSerializer.close();
 
       // Close the stream
-      if (out != null) {
-        out.flush();
-        compressedBytesWritten = out.getPos() - start;
-        
-        // Close the underlying stream iff we own it...
-        if (ownOutputStream) {
-          out.close();
-        }
-        
-        out = null;
+      rawOut.flush();
+      compressedBytesWritten = rawOut.getPos() - start;
+
+      // Close the underlying stream iff we own it...
+      if (ownOutputStream) {
+        out.close();
       }
+      out = null;
     }
 
     public void append(K key, V value) throws IOException {
@@ -141,12 +146,18 @@
       // Append the 'key'
       keySerializer.serialize(key);
       int keyLength = buffer.getLength();
-      if (keyLength == 0)
-        throw new IOException("zero length keys not allowed: " + key);
+      if (keyLength < 0) {
+        throw new IOException("Negative key-length not allowed: " + keyLength + 
+                              " for " + key);
+      }
 
       // Append the 'value'
       valueSerializer.serialize(value);
       int valueLength = buffer.getLength() - keyLength;
+      if (valueLength < 0) {
+        throw new IOException("Negative value-length not allowed: " + 
+                              valueLength + " for " + value);
+      }
       
       // Write the record out
       WritableUtils.writeVInt(out, keyLength);                  // key length
@@ -165,8 +176,17 @@
     public void append(DataInputBuffer key, DataInputBuffer value)
     throws IOException {
       int keyLength = key.getLength() - key.getPosition();
-      int valueLength = value.getLength() - value.getPosition();
+      if (keyLength < 0) {
+        throw new IOException("Negative key-length not allowed: " + keyLength + 
+                              " for " + key);
+      }
       
+      int valueLength = value.getLength() - value.getPosition();
+      if (valueLength < 0) {
+        throw new IOException("Negative value-length not allowed: " + 
+                              valueLength + " for " + value);
+      }
+
       WritableUtils.writeVInt(out, keyLength);
       WritableUtils.writeVInt(out, valueLength);
       out.write(key.getData(), key.getPosition(), keyLength); 
@@ -192,7 +212,7 @@
    */
   public static class Reader<K extends Object, V extends Object> {
     private static final int DEFAULT_BUFFER_SIZE = 128*1024;
-    private static final int MAX_VINT_SIZE = 5;
+    private static final int MAX_VINT_SIZE = 9;
 
     InputStream in;
     Decompressor decompressor;
@@ -204,6 +224,8 @@
     int bufferSize = DEFAULT_BUFFER_SIZE;
     DataInputBuffer dataIn = new DataInputBuffer();
 
+    int recNo = 1;
+
     public Reader(Configuration conf, FileSystem fs, Path file,
                   CompressionCodec codec) throws IOException {
       this(conf, fs.open(file), fs.getFileStatus(file).getLen(), codec);
@@ -226,6 +248,15 @@
     
     public long getLength() { return fileLength; }
     
+    /**
+     * Read upto len bytes into buf starting at offset off.
+     * 
+     * @param buf buffer 
+     * @param off offset
+     * @param len length of buffer
+     * @return the no. of bytes read
+     * @throws IOException
+     */
     private int readData(byte[] buf, int off, int len) throws IOException {
       int bytesRead = 0;
       while (bytesRead < len) {
@@ -291,6 +322,16 @@
         return false;
       }
       
+      // Sanity check
+      if (keyLength < 0) {
+        throw new IOException("Rec# " + recNo + ": Negative key-length: " + 
+                              keyLength);
+      }
+      if (valueLength < 0) {
+        throw new IOException("Rec# " + recNo + ": Negative value-length: " + 
+                              valueLength);
+      }
+      
       final int recordLength = keyLength + valueLength;
       
       // Check if we have the raw key/value in the buffer
@@ -299,7 +340,8 @@
         
         // Sanity check
         if ((dataIn.getLength() - dataIn.getPosition()) < recordLength) {
-          throw new EOFException("Could read the next record");
+          throw new EOFException("Rec# " + recNo + ": Could read the next " +
+          		                   " record");
         }
       }
 
@@ -310,9 +352,17 @@
       value.reset(data, (pos + keyLength), valueLength);
       
       // Position for the next record
-      dataIn.skip(recordLength);
+      long skipped = dataIn.skip(recordLength);
+      if (skipped != recordLength) {
+        throw new IOException("Rec# " + recNo + ": Failed to skip past record " +
+        		                  "of length: " + recordLength);
+      }
+      
+      // Record the bytes read
       bytesRead += recordLength;
 
+      ++recNo;
+      
       return true;
     }
 
@@ -324,9 +374,7 @@
       }
       
       // Close the underlying stream
-      if (in != null) {
-        in.close();
-      }
+      in.close();
       
       // Release the buffer
       dataIn = null;
@@ -339,18 +387,34 @@
    */
   public static class InMemoryReader<K, V> extends Reader<K, V> {
     RamManager ramManager;
+    TaskAttemptID taskAttemptId;
     
-    public InMemoryReader(RamManager ramManager, 
+    public InMemoryReader(RamManager ramManager, TaskAttemptID taskAttemptId,
                           byte[] data, int start, int length) {
       this.ramManager = ramManager;
+      this.taskAttemptId = taskAttemptId;
       
       buffer = data;
       fileLength = bufferSize = (length - start);
       dataIn.reset(buffer, start, length);
     }
     
+    private void dumpOnError() {
+      File dumpFile = new File("../output/" + taskAttemptId + ".dump");
+      System.err.println("Dumping corrupt map-output of " + taskAttemptId + 
+                         " to " + dumpFile.getAbsolutePath());
+      try {
+        FileOutputStream fos = new FileOutputStream(dumpFile);
+        fos.write(buffer, 0, bufferSize);
+        fos.close();
+      } catch (IOException ioe) {
+        System.err.println("Failed to dump map-output of " + taskAttemptId);
+      }
+    }
+    
     public boolean next(DataInputBuffer key, DataInputBuffer value) 
     throws IOException {
+      try {
       // Sanity check
       if (eof) {
         throw new EOFException("Completed reading " + bytesRead);
@@ -369,6 +433,16 @@
         return false;
       }
       
+      // Sanity check
+      if (keyLength < 0) {
+        throw new IOException("Rec# " + recNo + ": Negative key-length: " + 
+                              keyLength);
+      }
+      if (valueLength < 0) {
+        throw new IOException("Rec# " + recNo + ": Negative value-length: " + 
+                              valueLength);
+      }
+
       final int recordLength = keyLength + valueLength;
       
       // Setup the key and value
@@ -380,14 +454,20 @@
       // Position for the next record
       long skipped = dataIn.skip(recordLength);
       if (skipped != recordLength) {
-        throw new IOException("Failed to skip past record of length: " + 
+        throw new IOException("Rec# " + recNo + ": Failed to skip past record of length:
" + 
                               recordLength);
       }
       
       // Record the byte
       bytesRead += recordLength;
 
+      ++recNo;
+      
       return true;
+      } catch (IOException ioe) {
+        dumpOnError();
+        throw ioe;
+      }
     }
       
     public void close() {

Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=675991&r1=675990&r2=675991&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java Fri
Jul 11 08:33:13 2008
@@ -1002,18 +1002,14 @@
       //The final index file output stream
       FSDataOutputStream finalIndexOut = localFs.create(finalIndexFile, true,
                                                         4096);
-      long segmentStart;
-      
       if (numSpills == 0) {
         //create dummy files
         for (int i = 0; i < partitions; i++) {
-          segmentStart = finalOut.getPos();
+          long segmentStart = finalOut.getPos();
           Writer<K, V> writer = new Writer<K, V>(job, finalOut, 
                                                  keyClass, valClass, null);
-          finalIndexOut.writeLong(segmentStart);
-          finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
-          finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
           writer.close();
+          writeIndexRecord(finalIndexOut, finalOut, segmentStart, writer);
         }
         finalOut.close();
         finalIndexOut.close();
@@ -1054,7 +1050,7 @@
                          job.getOutputKeyComparator(), reporter);
 
           //write merged output to disk
-          segmentStart = finalOut.getPos();
+          long segmentStart = finalOut.getPos();
           Writer<K, V> writer = 
               new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
           if (null == combinerClass || job.getCombineOnceOnly() ||
@@ -1097,6 +1093,8 @@
       indexOut.writeLong(writer.getRawLength());
       long segmentLength = out.getPos() - start;
       indexOut.writeLong(segmentLength);
+      LOG.info("Index: (" + start + ", " + writer.getRawLength() + ", " + 
+               segmentLength + ")");
     }
     
   } // MapOutputBuffer

Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=675991&r1=675990&r2=675991&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri
Jul 11 08:33:13 2008
@@ -52,6 +52,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IntWritable;
@@ -59,6 +60,7 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.Decompressor;
@@ -663,6 +665,7 @@
     /** Describes the output of a map; could either be on disk or in-memory. */
     private class MapOutput {
       final TaskID mapId;
+      final TaskAttemptID mapAttemptId;
       
       final Path file;
       final Configuration conf;
@@ -671,8 +674,10 @@
       final boolean inMemory;
       long size;
       
-      public MapOutput(TaskID mapId, Configuration conf, Path file, long size) {
+      public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, 
+                       Configuration conf, Path file, long size) {
         this.mapId = mapId;
+        this.mapAttemptId = mapAttemptId;
         
         this.conf = conf;
         this.file = file;
@@ -683,8 +688,9 @@
         this.inMemory = false;
       }
       
-      public MapOutput(TaskID mapId, byte[] data) {
+      public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, byte[] data) {
         this.mapId = mapId;
+        this.mapAttemptId = mapAttemptId;
         
         this.file = null;
         this.conf = null;
@@ -1177,7 +1183,8 @@
         // Copy map-output into an in-memory buffer
         byte[] shuffleData = new byte[mapOutputLength];
         MapOutput mapOutput = 
-          new MapOutput(mapOutputLoc.getTaskId(), shuffleData);
+          new MapOutput(mapOutputLoc.getTaskId(), 
+                        mapOutputLoc.getTaskAttemptId(), shuffleData);
         
         int bytesRead = 0;
         try {
@@ -1246,6 +1253,16 @@
           );
         }
 
+        // TODO: Remove this after a 'fix' for HADOOP-3647
+        if (mapOutputLength > 0) {
+          DataInputBuffer dib = new DataInputBuffer();
+          dib.reset(shuffleData, 0, shuffleData.length);
+          LOG.info("Rec #1 from " + mapOutputLoc.getTaskAttemptId() + " -> (" + 
+                   WritableUtils.readVInt(dib) + ", " + 
+                   WritableUtils.readVInt(dib) + ") from " + 
+                   mapOutputLoc.getHost());
+        }
+        
         return mapOutput;
       }
       
@@ -1260,8 +1277,8 @@
                                          mapOutputLength, conf);
 
         MapOutput mapOutput = 
-          new MapOutput(mapOutputLoc.getTaskId(), conf, 
-                        localFileSys.makeQualified(localFilename), 
+          new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(), 
+                        conf, localFileSys.makeQualified(localFilename), 
                         mapOutputLength);
 
 
@@ -1806,7 +1823,7 @@
           MapOutput mo = mapOutputsFilesInMemory.remove(0);
           
           Reader<K, V> reader = 
-            new InMemoryReader<K, V>(ramManager, 
+            new InMemoryReader<K, V>(ramManager, mo.mapAttemptId,
                                      mo.data, 0, mo.data.length);
           Segment<K, V> segment = 
             new Segment<K, V>(reader, true);

Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=675991&r1=675990&r2=675991&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Fri Jul 11 08:33:13 2008
@@ -59,6 +59,7 @@
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
@@ -2422,9 +2423,9 @@
         indexIn.seek(reduce * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
           
         //read the offset and length of the partition data
-        long startOffset = indexIn.readLong();
-        long rawPartLength = indexIn.readLong();
-        long partLength = indexIn.readLong();
+        final long startOffset = indexIn.readLong();
+        final long rawPartLength = indexIn.readLong();
+        final long partLength = indexIn.readLong();
 
         indexIn.close();
         indexIn = null;
@@ -2447,6 +2448,23 @@
          */
         //open the map-output file
         mapOutputIn = fileSys.open(mapOutputFileName);
+        
+        // TODO: Remove this after a 'fix' for HADOOP-3647
+        // The clever trick here to reduce the impact of the extra seek for
+        // logging the first key/value lengths is to read the lengths before
+        // the second seek for the actual shuffle. The second seek is almost
+        // a no-op since it is very short (go back length of two VInts) and the 
+        // data is almost guaranteed to be in the filesystem's buffers.
+        // WARN: This won't work for compressed map-outputs!
+        int firstKeyLength = 0;
+        int firstValueLength = 0;
+        if (partLength > 0) {
+          mapOutputIn.seek(startOffset);
+          firstKeyLength = WritableUtils.readVInt(mapOutputIn);
+          firstValueLength = WritableUtils.readVInt(mapOutputIn);
+        }
+        
+
         //seek to the correct offset for the reduce
         mapOutputIn.seek(startOffset);
           
@@ -2472,7 +2490,8 @@
         
         LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce + 
                  " from map: " + mapId + " given " + partLength + "/" + 
-                 rawPartLength);
+                 rawPartLength + " from " + startOffset + " with (" + 
+                 firstKeyLength + ", " + firstValueLength + ")");
       } catch (IOException ie) {
         TaskTracker tracker = 
           (TaskTracker) context.getAttribute("task.tracker");



Mime
View raw message