hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r896243 - in /hadoop/common/trunk: ./ src/java/org/apache/hadoop/fs/ src/test/core/org/apache/hadoop/fs/
Date Tue, 05 Jan 2010 22:14:19 GMT
Author: tomwhite
Date: Tue Jan  5 22:14:17 2010
New Revision: 896243

URL: http://svn.apache.org/viewvc?rev=896243&view=rev
Log:
HADOOP-3205. Read multiple chunks directly from FSInputChecker subclass into user buffers.
Contributed by Todd Lipcon.

Modified:
    hadoop/common/trunk/CHANGES.txt
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/FSInputChecker.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestChecksumFileSystem.java

Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=896243&r1=896242&r2=896243&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Tue Jan  5 22:14:17 2010
@@ -83,6 +83,9 @@
     HADOOP-6443. Serialization classes accept invalid metadata.
     (Aaron Kimball via tomwhite)
 
+    HADOOP-3205. Read multiple chunks directly from FSInputChecker subclass
+    into user buffers. (Todd Lipcon via tomwhite)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java?rev=896243&r1=896242&r2=896243&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java Tue Jan  5 22:14:17
2010
@@ -205,24 +205,41 @@
     @Override
     protected int readChunk(long pos, byte[] buf, int offset, int len,
         byte[] checksum) throws IOException {
+
       boolean eof = false;
-      if(needChecksum()) {
-        try {
-          long checksumPos = getChecksumFilePos(pos); 
-          if(checksumPos != sums.getPos()) {
-            sums.seek(checksumPos);
-          }
-          sums.readFully(checksum);
-        } catch (EOFException e) {
+      if (needChecksum()) {
+        assert checksum != null; // we have a checksum buffer
+        assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
+        assert len >= bytesPerSum; // we must read at least one chunk
+
+        final int checksumsToRead = Math.min(
+          len/bytesPerSum, // number of checksums based on len to read
+          checksum.length / CHECKSUM_SIZE); // size of checksum buffer
+        long checksumPos = getChecksumFilePos(pos); 
+        if(checksumPos != sums.getPos()) {
+          sums.seek(checksumPos);
+        }
+
+        int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
+        if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
+          throw new ChecksumException(
+            "Checksum file not a length multiple of checksum size " +
+            "in " + file + " at " + pos + " checksumpos: " + checksumPos +
+            " sumLenread: " + sumLenRead,
+            pos);
+        }
+        if (sumLenRead <= 0) { // we're at the end of the file
           eof = true;
+        } else {
+          // Adjust amount of data to read based on how many checksum chunks we read
+          len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
         }
-        len = bytesPerSum;
       }
       if(pos != datas.getPos()) {
         datas.seek(pos);
       }
       int nread = readFully(datas, buf, offset, len);
-      if( eof && nread > 0) {
+      if (eof && nread > 0) {
         throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
       }
       return nread;

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java?rev=896243&r1=896242&r2=896243&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java Tue Jan  5 22:14:17
2010
@@ -200,21 +200,35 @@
         byte[] checksum) throws IOException {
       boolean eof = false;
       if (needChecksum()) {
-        try {
-          final long checksumPos = getChecksumFilePos(pos); 
-          if (checksumPos != sums.getPos()) {
-            sums.seek(checksumPos);
-          }
-          sums.readFully(checksum);
-        } catch (EOFException e) {
+        assert checksum != null; // we have a checksum buffer
+        assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
+        assert len >= bytesPerSum; // we must read at least one chunk
+
+        final int checksumsToRead = Math.min(
+          len/bytesPerSum, // number of checksums based on len to read
+          checksum.length / CHECKSUM_SIZE); // size of checksum buffer
+        long checksumPos = getChecksumFilePos(pos); 
+        if(checksumPos != sums.getPos()) {
+          sums.seek(checksumPos);
+        }
+
+        int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
+        if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
+          throw new EOFException("Checksum file not a length multiple of checksum size "
+
+                                 "in " + file + " at " + pos + " checksumpos: " + checksumPos
+
+                                 " sumLenread: " + sumLenRead );
+        }
+        if (sumLenRead <= 0) { // we're at the end of the file
           eof = true;
+        } else {
+          // Adjust amount of data to read based on how many checksum chunks we read
+          len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
         }
-        len = bytesPerSum;
       }
       if (pos != datas.getPos()) {
         datas.seek(pos);
       }
-      final int nread = readFully(datas, buf, offset, len);
+      int nread = readFully(datas, buf, offset, len);
       if (eof && nread > 0) {
         throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
       }

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/FSInputChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/FSInputChecker.java?rev=896243&r1=896242&r2=896243&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/FSInputChecker.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/FSInputChecker.java Tue Jan  5 22:14:17
2010
@@ -24,6 +24,8 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.StringUtils;
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
 
 /**
  * This is a generic input stream for verifying checksums for
@@ -38,16 +40,26 @@
   protected Path file;
   private Checksum sum;
   private boolean verifyChecksum = true;
-  private byte[] buf;
+  private int maxChunkSize; // data bytes for checksum (eg 512)
+  private byte[] buf; // buffer for non-chunk-aligned reading
   private byte[] checksum;
-  private int pos;
-  private int count;
+  private IntBuffer checksumInts; // wrapper on checksum buffer
+  private int pos; // the position of the reader inside buf
+  private int count; // the number of bytes currently in buf
   
   private int numOfRetries;
   
   // cached file position
+  // this should always be a multiple of maxChunkSize
   private long chunkPos = 0;
-  
+
+  // Number of checksum chunks that can be read at once into a user
+  // buffer. Chosen by benchmarks - higher values do not reduce
+  // CPU usage. The size of the data reads made to the underlying stream
+  // will be CHUNKS_PER_READ * maxChunkSize.
+  private static final int CHUNKS_PER_READ = 32;
+  protected static final int CHECKSUM_SIZE = 4; // 32-bit checksum
+
   /** Constructor
    * 
    * @param file The name of the file to be read
@@ -72,14 +84,34 @@
     set(verifyChecksum, sum, chunkSize, checksumSize);
   }
   
-  /** Reads in next checksum chunk data into <code>buf</code> at <code>offset</code>
+  /**
+   * Reads in checksum chunks into <code>buf</code> at <code>offset</code>
    * and checksum into <code>checksum</code>.
+   * Since checksums can be disabled, there are two cases implementors need
+   * to worry about:
+   *
+   *  (a) needChecksum() will return false:
+   *     - len can be any positive value
+   *     - checksum will be null
+   *     Implementors should simply pass through to the underlying data stream.
+   * or
+   *  (b) needChecksum() will return true:
+   *    - len >= maxChunkSize
+   *    - checksum.length is a multiple of CHECKSUM_SIZE
+   *    Implementors should read an integer number of data chunks into
+   *    buf. The amount read should be bounded by len or by 
+   *    checksum.length / CHECKSUM_SIZE * maxChunkSize. Note that len may
+   *    be a value that is not a multiple of maxChunkSize, in which case
+   *    the implementation may return less than len.
+   *
    * The method is used for implementing read, therefore, it should be optimized
-   * for sequential reading
+   * for sequential reading.
+   *
    * @param pos chunkPos
    * @param buf desitination buffer
    * @param offset offset in buf at which to store data
-   * @param len maximun number of bytes to read
+   * @param len maximum number of bytes to read
+   * @param checksum the data buffer into which to write checksums
    * @return number of bytes read
    */
   abstract protected int readChunk(long pos, byte[] buf, int offset, int len,
@@ -96,7 +128,7 @@
   protected synchronized boolean needChecksum() {
     return verifyChecksum && sum != null;
   }
-  
+
   /**
    * Read one checksum-verified byte
    * 
@@ -173,7 +205,7 @@
   private void fill(  ) throws IOException {
     assert(pos>=count);
     // fill internal buffer
-    count = readChecksumChunk(buf, 0, buf.length);
+    count = readChecksumChunk(buf, 0, maxChunkSize);
     if (count < 0) count = 0;
   }
   
@@ -185,13 +217,13 @@
   throws IOException {
     int avail = count-pos;
     if( avail <= 0 ) {
-      if(len>=buf.length) {
+      if(len >= maxChunkSize) {
         // read a chunk to user buffer directly; avoid one copy
         int nread = readChecksumChunk(b, off, len);
         return nread;
       } else {
         // read a chunk into the local buffer
-        fill();
+         fill();
         if( count <= 0 ) {
           return -1;
         } else {
@@ -207,10 +239,10 @@
     return cnt;    
   }
   
-  /* Read up one checksum chunk to array <i>b</i> at pos <i>off</i>
-   * It requires a checksum chunk boundary
+  /* Read up one or more checksum chunk to array <i>b</i> at pos <i>off</i>
+   * It requires at least one checksum chunk boundary
    * in between <cur_pos, cur_pos+len> 
-   * and it stops reading at the boundary or at the end of the stream;
+   * and it stops reading at the last boundary or at the end of the stream;
    * Otherwise an IllegalArgumentException is thrown.
    * This makes sure that all data read are checksum verified.
    * 
@@ -223,7 +255,7 @@
    *            the stream has been reached.
    * @throws IOException if an I/O error occurs.
    */ 
-  private int readChecksumChunk(byte b[], int off, int len)
+  private int readChecksumChunk(byte b[], final int off, final int len)
   throws IOException {
     // invalidate buffer
     count = pos = 0;
@@ -236,13 +268,12 @@
 
       try {
         read = readChunk(chunkPos, b, off, len, checksum);
-        if( read > 0 ) {
+        if( read > 0) {
           if( needChecksum() ) {
-            sum.update(b, off, read);
-            verifySum(chunkPos);
+            verifySums(b, off, read);
           }
           chunkPos += read;
-        } 
+        }
         retry = false;
       } catch (ChecksumException ce) {
           LOG.info("Found checksum error: b[" + off + ", " + (off+read) + "]="
@@ -266,26 +297,38 @@
     } while (retry);
     return read;
   }
-  
-  /* verify checksum for the chunk.
-   * @throws ChecksumException if there is a mismatch
-   */
-  private void verifySum(long errPos) throws ChecksumException {
-    long crc = getChecksum();
-    long sumValue = sum.getValue();
-    sum.reset();
-    if (crc != sumValue) {
-      throw new ChecksumException(
-          "Checksum error: "+file+" at "+errPos, errPos);
+
+  private void verifySums(final byte b[], final int off, int read)
+    throws ChecksumException
+  {
+    int leftToVerify = read;
+    int verifyOff = 0;
+    checksumInts.rewind();
+    checksumInts.limit((read - 1)/maxChunkSize + 1);
+
+    while (leftToVerify > 0) {
+      sum.update(b, off + verifyOff, Math.min(leftToVerify, maxChunkSize));
+      int expected = checksumInts.get();
+      int calculated = (int)sum.getValue();
+      sum.reset();
+
+      if (expected != calculated) {
+        long errPos = chunkPos + verifyOff;
+        throw new ChecksumException(
+          "Checksum error: "+file+" at "+ errPos +
+          " exp: " + expected + " got: " + calculated, errPos);
+      }
+      leftToVerify -= maxChunkSize;
+      verifyOff += maxChunkSize;
     }
   }
-  
-  /* calculate checksum value */
-  private long getChecksum() {
-    return checksum2long(checksum);
-  }
 
-  /** Convert a checksum byte array to a long */
+  /**
+   * Convert a checksum byte array to a long
+   * This is deprecated since 0.22 since it is no longer in use
+   * by this class.
+   */
+  @Deprecated
   static public long checksum2long(byte[] checksum) {
     long crc = 0L;
     for(int i=0; i<checksum.length; i++) {
@@ -293,7 +336,7 @@
     }
     return crc;
   }
-  
+
   @Override
   public synchronized long getPos() throws IOException {
     return chunkPos-Math.max(0L, count - pos);
@@ -399,11 +442,19 @@
    * @param checksumSize checksum size
    */
   final protected synchronized void set(boolean verifyChecksum,
-      Checksum sum, int maxChunkSize, int checksumSize ) {
+      Checksum sum, int maxChunkSize, int checksumSize) {
+
+    // The code makes assumptions that checksums are always 32-bit.
+    assert !verifyChecksum || sum == null || checksumSize == CHECKSUM_SIZE;
+
+    this.maxChunkSize = maxChunkSize;
     this.verifyChecksum = verifyChecksum;
     this.sum = sum;
     this.buf = new byte[maxChunkSize];
-    this.checksum = new byte[checksumSize];
+    // The size of the checksum array here determines how much we can
+    // read in a single call to readChunk
+    this.checksum = new byte[CHUNKS_PER_READ * checksumSize];
+    this.checksumInts = ByteBuffer.wrap(checksum).asIntBuffer();
     this.count = 0;
     this.pos = 0;
   }

Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestChecksumFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestChecksumFileSystem.java?rev=896243&r1=896242&r2=896243&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestChecksumFileSystem.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestChecksumFileSystem.java Tue
Jan  5 22:14:17 2010
@@ -26,6 +26,9 @@
 import junit.framework.TestCase;
 
 public class TestChecksumFileSystem extends TestCase {
+  static final String TEST_ROOT_DIR
+    = System.getProperty("test.build.data","build/test/data/work-dir/localfs");
+
   public void testgetChecksumLength() throws Exception {
     assertEquals(8, ChecksumFileSystem.getChecksumLength(0L, 512));
     assertEquals(12, ChecksumFileSystem.getChecksumLength(1L, 512));
@@ -38,10 +41,7 @@
                  ChecksumFileSystem.getChecksumLength(10000000000000L, 10));    
   } 
   
-  public void testVerifyChecksum() throws Exception {
-    String TEST_ROOT_DIR
-    = System.getProperty("test.build.data","build/test/data/work-dir/localfs");
-    
+  public void testVerifyChecksum() throws Exception {    
     Configuration conf = new Configuration();
     LocalFileSystem localFs = FileSystem.getLocal(conf);
     Path testPath = new Path(TEST_ROOT_DIR, "testPath");
@@ -54,9 +54,15 @@
     fout.write("testing you".getBytes());
     fout.close();
 
+    // Exercise some boundary cases - a divisor of the chunk size
+    // the chunk size, 2x chunk size, and +/-1 around these.
     TestLocalFileSystem.readFile(localFs, testPath, 128);
+    TestLocalFileSystem.readFile(localFs, testPath, 511);
     TestLocalFileSystem.readFile(localFs, testPath, 512);
+    TestLocalFileSystem.readFile(localFs, testPath, 513);
+    TestLocalFileSystem.readFile(localFs, testPath, 1023);
     TestLocalFileSystem.readFile(localFs, testPath, 1024);
+    TestLocalFileSystem.readFile(localFs, testPath, 1025);
 
     localFs.delete(localFs.getChecksumFile(testPath), true);
     assertTrue("checksum deleted", !localFs.exists(localFs.getChecksumFile(testPath)));
@@ -75,9 +81,80 @@
     assertTrue("error reading", errorRead);
     
     //now setting verify false, the read should succeed
-    localFs.setVerifyChecksum(false);
-    String str = TestLocalFileSystem.readFile(localFs, testPath, 1024);
-    assertTrue("read", "testing".equals(str));
+    try {
+      localFs.setVerifyChecksum(false);
+      String str = TestLocalFileSystem.readFile(localFs, testPath, 1024);
+      assertTrue("read", "testing".equals(str));
+    } finally {
+      // reset for other tests
+      localFs.setVerifyChecksum(true);
+    }
     
   }
+
+  public void testMultiChunkFile() throws Exception {
+    Configuration conf = new Configuration();
+    LocalFileSystem localFs = FileSystem.getLocal(conf);
+    Path testPath = new Path(TEST_ROOT_DIR, "testMultiChunk");
+    FSDataOutputStream fout = localFs.create(testPath);
+    for (int i = 0; i < 1000; i++) {
+      fout.write(("testing" + i).getBytes());
+    }
+    fout.close();
+
+    // Exercise some boundary cases - a divisor of the chunk size
+    // the chunk size, 2x chunk size, and +/-1 around these.
+    TestLocalFileSystem.readFile(localFs, testPath, 128);
+    TestLocalFileSystem.readFile(localFs, testPath, 511);
+    TestLocalFileSystem.readFile(localFs, testPath, 512);
+    TestLocalFileSystem.readFile(localFs, testPath, 513);
+    TestLocalFileSystem.readFile(localFs, testPath, 1023);
+    TestLocalFileSystem.readFile(localFs, testPath, 1024);
+    TestLocalFileSystem.readFile(localFs, testPath, 1025);
+  }
+
+  /**
+   * Test to ensure that if the checksum file is truncated, a
+   * ChecksumException is thrown
+   */
+  public void testTruncatedChecksum() throws Exception { 
+    Configuration conf = new Configuration();
+    LocalFileSystem localFs = FileSystem.getLocal(conf);
+    Path testPath = new Path(TEST_ROOT_DIR, "testtruncatedcrc");
+    FSDataOutputStream fout = localFs.create(testPath);
+    fout.write("testing truncation".getBytes());
+    fout.close();
+
+    // Read in the checksum
+    Path checksumFile = localFs.getChecksumFile(testPath);
+    FileSystem rawFs = localFs.getRawFileSystem();
+    FSDataInputStream checksumStream = rawFs.open(checksumFile);
+    byte buf[] = new byte[8192];
+    int read = checksumStream.read(buf, 0, buf.length);
+    checksumStream.close();
+
+    // Now rewrite the checksum file with the last byte missing
+    FSDataOutputStream replaceStream = rawFs.create(checksumFile);
+    replaceStream.write(buf, 0, read - 1);
+    replaceStream.close();
+
+    // Now reading the file should fail with a ChecksumException
+    try {
+      TestLocalFileSystem.readFile(localFs, testPath, 1024);
+      fail("Did not throw a ChecksumException when reading truncated " +
+           "crc file");
+    } catch(ChecksumException ie) {
+    }
+
+    // telling it not to verify checksums, should avoid issue.
+    try {
+      localFs.setVerifyChecksum(false);
+      String str = TestLocalFileSystem.readFile(localFs, testPath, 1024);
+      assertTrue("read", "testing truncation".equals(str));
+    } finally {
+      // reset for other tests
+      localFs.setVerifyChecksum(true);
+    }
+
+  }
 }



Mime
View raw message