hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r765062 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/IFile.java src/mapred/org/apache/hadoop/mapred/Merger.java src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Date Wed, 15 Apr 2009 06:14:00 GMT
Author: cdouglas
Date: Wed Apr 15 06:13:59 2009
New Revision: 765062

URL: http://svn.apache.org/viewvc?rev=765062&view=rev
Log:
HADOOP-5494. Modify sorted map output merger to lazily read values,
rather than buffering at least one record for each segment. Contributed by Devaraj Das.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=765062&r1=765061&r2=765062&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Apr 15 06:13:59 2009
@@ -229,6 +229,10 @@
     HADOOP-5509. PendingReplicationBlocks does not start monitor in the
     constructor. (shv)
 
+    HADOOP-5494. Modify sorted map output merger to lazily read values,
+    rather than buffering at least one record for each segment. (Devaraj Das
+    via cdouglas)
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=765062&r1=765061&r2=765062&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java Wed Apr 15 06:13:59 2009
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -250,12 +252,16 @@
     final long fileLength;
     boolean eof = false;
     final IFileInputStream checksumIn;
+    DataInputStream dataIn;
     
     byte[] buffer = null;
     int bufferSize = DEFAULT_BUFFER_SIZE;
-    DataInputBuffer dataIn = new DataInputBuffer();
 
     int recNo = 1;
+    int currentKeyLength;
+    int currentValueLength;
+    byte keyBytes[] = new byte[0];
+    
     
     /**
      * Construct an IFile Reader.
@@ -298,6 +304,7 @@
       } else {
         this.in = checksumIn;
       }
+      this.dataIn = new DataInputStream(this.in);
       this.fileLength = length;
       
       if (conf != null) {
@@ -334,104 +341,70 @@
       return len;
     }
     
-    void readNextBlock(int minSize) throws IOException {
-      if (buffer == null) {
-        buffer = new byte[bufferSize];
-        dataIn.reset(buffer, 0, 0);
-      }
-      buffer = 
-        rejigData(buffer, 
-                  (bufferSize < minSize) ? new byte[minSize << 1] : buffer);
-      bufferSize = buffer.length;
-    }
-    
-    private byte[] rejigData(byte[] source, byte[] destination) 
-    throws IOException{
-      // Copy remaining data into the destination array
-      int bytesRemaining = dataIn.getLength()-dataIn.getPosition();
-      if (bytesRemaining > 0) {
-        System.arraycopy(source, dataIn.getPosition(), 
-            destination, 0, bytesRemaining);
-      }
-      
-      // Read as much data as will fit from the underlying stream 
-      int n = readData(destination, bytesRemaining, 
-                       (destination.length - bytesRemaining));
-      dataIn.reset(destination, 0, (bytesRemaining + n));
-      
-      return destination;
-    }
-    
-    public boolean next(DataInputBuffer key, DataInputBuffer value) 
-    throws IOException {
+    protected boolean positionToNextRecord(DataInput dIn) throws IOException {
       // Sanity check
       if (eof) {
         throw new EOFException("Completed reading " + bytesRead);
       }
       
-      // Check if we have enough data to read lengths
-      if ((dataIn.getLength() - dataIn.getPosition()) < 2*MAX_VINT_SIZE) {
-        readNextBlock(2*MAX_VINT_SIZE);
-      }
-      
       // Read key and value lengths
-      int oldPos = dataIn.getPosition();
-      int keyLength = WritableUtils.readVInt(dataIn);
-      int valueLength = WritableUtils.readVInt(dataIn);
-      int pos = dataIn.getPosition();
-      bytesRead += pos - oldPos;
+      currentKeyLength = WritableUtils.readVInt(dIn);
+      currentValueLength = WritableUtils.readVInt(dIn);
+      bytesRead += WritableUtils.getVIntSize(currentKeyLength) +
+                   WritableUtils.getVIntSize(currentValueLength);
       
       // Check for EOF
-      if (keyLength == EOF_MARKER && valueLength == EOF_MARKER) {
+      if (currentKeyLength == EOF_MARKER && currentValueLength == EOF_MARKER) {
         eof = true;
         return false;
       }
       
       // Sanity check
-      if (keyLength < 0) {
+      if (currentKeyLength < 0) {
         throw new IOException("Rec# " + recNo + ": Negative key-length: " + 
-                              keyLength);
+                              currentKeyLength);
       }
-      if (valueLength < 0) {
+      if (currentValueLength < 0) {
         throw new IOException("Rec# " + recNo + ": Negative value-length: " + 
-                              valueLength);
+                              currentValueLength);
       }
-      
-      final int recordLength = keyLength + valueLength;
-      
-      // Check if we have the raw key/value in the buffer
-      if ((dataIn.getLength()-pos) < recordLength) {
-        readNextBlock(recordLength);
-        
-        // Sanity check
-        if ((dataIn.getLength() - dataIn.getPosition()) < recordLength) {
-          throw new EOFException("Rec# " + recNo + ": Could read the next " +
-          		                   " record");
-        }
+            
+      return true;
+    }
+    
+    public boolean nextRawKey(DataInputBuffer key) throws IOException {
+      if (!positionToNextRecord(dataIn)) {
+        return false;
       }
-
-      // Setup the key and value
-      pos = dataIn.getPosition();
-      byte[] data = dataIn.getData();
-      key.reset(data, pos, keyLength);
-      value.reset(data, (pos + keyLength), valueLength);
-      
-      // Position for the next record
-      long skipped = dataIn.skip(recordLength);
-      if (skipped != recordLength) {
-        throw new IOException("Rec# " + recNo + ": Failed to skip past record " +
-        		                  "of length: " + recordLength);
+      if (keyBytes.length < currentKeyLength) {
+        keyBytes = new byte[currentKeyLength << 1];
       }
+      int i = readData(keyBytes, 0, currentKeyLength);
+      if (i != currentKeyLength) {
+        throw new IOException ("Asked for " + currentKeyLength + " Got: " + i);
+      }
+      key.reset(keyBytes, currentKeyLength);
+      bytesRead += currentKeyLength;
+      return true;
+    }
+    
+    public void nextRawValue(DataInputBuffer value) throws IOException {
+      final byte[] valBytes = (value.getData().length < currentValueLength)
+        ? new byte[currentValueLength << 1]
+        : value.getData();
+      int i = readData(valBytes, 0, currentValueLength);
+      if (i != currentValueLength) {
+        throw new IOException ("Asked for " + currentValueLength + " Got: " + i);
+      }
+      value.reset(valBytes, currentValueLength);
       
       // Record the bytes read
-      bytesRead += recordLength;
+      bytesRead += currentValueLength;
 
       ++recNo;
       ++numRecordsRead;
-
-      return true;
     }
-
+    
     public void close() throws IOException {
       // Return the decompressor
       if (decompressor != null) {
@@ -458,7 +431,7 @@
   public static class InMemoryReader<K, V> extends Reader<K, V> {
     RamManager ramManager;
     TaskAttemptID taskAttemptId;
-    
+    DataInputBuffer memDataIn = new DataInputBuffer();
     public InMemoryReader(RamManager ramManager, TaskAttemptID taskAttemptId,
                           byte[] data, int start, int length)
                           throws IOException {
@@ -468,7 +441,7 @@
       
       buffer = data;
       bufferSize = (int)fileLength;
-      dataIn.reset(buffer, start, length);
+      memDataIn.reset(buffer, start, length);
     }
     
     @Override
@@ -497,58 +470,49 @@
       }
     }
     
-    public boolean next(DataInputBuffer key, DataInputBuffer value) 
-    throws IOException {
+    public boolean nextRawKey(DataInputBuffer key) throws IOException {
       try {
-      // Sanity check
-      if (eof) {
-        throw new EOFException("Completed reading " + bytesRead);
-      }
-      
-      // Read key and value lengths
-      int oldPos = dataIn.getPosition();
-      int keyLength = WritableUtils.readVInt(dataIn);
-      int valueLength = WritableUtils.readVInt(dataIn);
-      int pos = dataIn.getPosition();
-      bytesRead += pos - oldPos;
-      
-      // Check for EOF
-      if (keyLength == EOF_MARKER && valueLength == EOF_MARKER) {
-        eof = true;
-        return false;
-      }
-      
-      // Sanity check
-      if (keyLength < 0) {
-        throw new IOException("Rec# " + recNo + ": Negative key-length: " + 
-                              keyLength);
-      }
-      if (valueLength < 0) {
-        throw new IOException("Rec# " + recNo + ": Negative value-length: " + 
-                              valueLength);
-      }
+        if (!positionToNextRecord(memDataIn)) {
+          return false;
+        }
+        // Setup the key
+        int pos = memDataIn.getPosition();
+        byte[] data = memDataIn.getData();
+        key.reset(data, pos, currentKeyLength);
+        // Position for the next value
+        long skipped = memDataIn.skip(currentKeyLength);
+        if (skipped != currentKeyLength) {
+          throw new IOException("Rec# " + recNo + 
+              ": Failed to skip past key of length: " + 
+              currentKeyLength);
+        }
 
-      final int recordLength = keyLength + valueLength;
-      
-      // Setup the key and value
-      pos = dataIn.getPosition();
-      byte[] data = dataIn.getData();
-      key.reset(data, pos, keyLength);
-      value.reset(data, (pos + keyLength), valueLength);
-      
-      // Position for the next record
-      long skipped = dataIn.skip(recordLength);
-      if (skipped != recordLength) {
-        throw new IOException("Rec# " + recNo + ": Failed to skip past record of length:
" + 
-                              recordLength);
+        // Record the byte
+        bytesRead += currentKeyLength;
+        return true;
+      } catch (IOException ioe) {
+        dumpOnError();
+        throw ioe;
       }
-      
-      // Record the byte
-      bytesRead += recordLength;
+    }
+    
+    public void nextRawValue(DataInputBuffer value) throws IOException {
+      try {
+        int pos = memDataIn.getPosition();
+        byte[] data = memDataIn.getData();
+        value.reset(data, pos, currentValueLength);
+
+        // Position for the next record
+        long skipped = memDataIn.skip(currentValueLength);
+        if (skipped != currentValueLength) {
+          throw new IOException("Rec# " + recNo + 
+              ": Failed to skip past value of length: " + 
+              currentValueLength);
+        }
+        // Record the byte
+        bytesRead += currentValueLength;
 
-      ++recNo;
-      
-      return true;
+        ++recNo;
       } catch (IOException ioe) {
         dumpOnError();
         throw ioe;
@@ -557,7 +521,7 @@
       
     public void close() {
       // Release
-      dataIn = null;
+      memDataIn = null;
       buffer = null;
       
       // Inform the RamManager

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=765062&r1=765061&r2=765062&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java Wed Apr 15 06:13:59
2009
@@ -128,8 +128,7 @@
 
   public static class Segment<K extends Object, V extends Object> {
     Reader<K, V> reader = null;
-    DataInputBuffer key = new DataInputBuffer();
-    DataInputBuffer value = new DataInputBuffer();
+    final DataInputBuffer key = new DataInputBuffer();
     
     Configuration conf = null;
     FileSystem fs = null;
@@ -172,18 +171,30 @@
       }
     }
     
+    boolean inMemory() {
+      return fs == null;
+    }
+    
     DataInputBuffer getKey() { return key; }
-    DataInputBuffer getValue() { return value; }
+
+    DataInputBuffer getValue(DataInputBuffer value) throws IOException {
+      nextRawValue(value);
+      return value;
+    }
 
     long getLength() { 
       return (reader == null) ?
         segmentLength : reader.getLength();
     }
     
-    boolean next() throws IOException {
-      return reader.next(key, value);
+    boolean nextRawKey() throws IOException {
+      return reader.nextRawKey(key);
     }
-    
+
+    void nextRawValue(DataInputBuffer value) throws IOException {
+      reader.nextRawValue(value);
+    }
+
     void close() throws IOException {
       reader.close();
       
@@ -214,7 +225,8 @@
     Progressable reporter;
     
     DataInputBuffer key;
-    DataInputBuffer value;
+    final DataInputBuffer value = new DataInputBuffer();
+    final DataInputBuffer diskIFileValue = new DataInputBuffer();
     
     Segment<K, V> minSegment;
     Comparator<Segment<K, V>> segmentComparator =   
@@ -284,7 +296,7 @@
 
     private void adjustPriorityQueue(Segment<K, V> reader) throws IOException{
       long startPos = reader.getPosition();
-      boolean hasNext = reader.next();
+      boolean hasNext = reader.nextRawKey();
       long endPos = reader.getPosition();
       totalBytesProcessed += endPos - startPos;
       mergeProgress.set(totalBytesProcessed * progPerByte);
@@ -311,10 +323,24 @@
         }
       }
       minSegment = top();
-      
+      if (!minSegment.inMemory()) {
+        //When we load the value from an inmemory segment, we reset
+        //the "value" DIB in this class to the inmem segment's byte[].
+        //When we load the value bytes from disk, we shouldn't use
+        //the same byte[] since it would corrupt the data in the inmem
+        //segment. So we maintain an explicit DIB for value bytes
+        //obtained from disk, and if the current segment is a disk
+        //segment, we reset the "value" DIB to the byte[] in that (so 
+        //we reuse the disk segment DIB whenever we consider
+        //a disk segment).
+        value.reset(diskIFileValue.getData(), diskIFileValue.getLength());
+      }
+      long startPos = minSegment.getPosition();
       key = minSegment.getKey();
-      value = minSegment.getValue();
-
+      minSegment.getValue(value);
+      long endPos = minSegment.getPosition();
+      totalBytesProcessed += endPos - startPos;
+      mergeProgress.set(totalBytesProcessed * progPerByte);
       return true;
     }
 
@@ -374,7 +400,7 @@
             // this helps in ensuring we don't use buffers until we need them
             segment.init(readsCounter);
             long startPos = segment.getPosition();
-            boolean hasNext = segment.next();
+            boolean hasNext = segment.nextRawKey();
             long endPos = segment.getPosition();
             startBytes += endPos - startPos;
             

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=765062&r1=765061&r2=765062&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Wed Apr 15 06:13:59
2009
@@ -2334,24 +2334,24 @@
         super(null, null, size, null, spilledRecordsCounter);
         this.kvIter = kvIter;
       }
-
-      public boolean next(DataInputBuffer key, DataInputBuffer value)
-          throws IOException {
+      public boolean nextRawKey(DataInputBuffer key) throws IOException {
         if (kvIter.next()) {
           final DataInputBuffer kb = kvIter.getKey();
-          final DataInputBuffer vb = kvIter.getValue();
           final int kp = kb.getPosition();
           final int klen = kb.getLength() - kp;
           key.reset(kb.getData(), kp, klen);
-          final int vp = vb.getPosition();
-          final int vlen = vb.getLength() - vp;
-          value.reset(vb.getData(), vp, vlen);
-          bytesRead += klen + vlen;
+          bytesRead += klen;
           return true;
         }
         return false;
       }
-
+      public void nextRawValue(DataInputBuffer value) throws IOException {
+        final DataInputBuffer vb = kvIter.getValue();
+        final int vp = vb.getPosition();
+        final int vlen = vb.getLength() - vp;
+        value.reset(vb.getData(), vp, vlen);
+        bytesRead += vlen;
+      }
       public long getPosition() throws IOException {
         return bytesRead;
       }



Mime
View raw message