hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1298641 [2/4] - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/fs/ main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/mapreduce/ main/java/org/apache/hadoop/hbase/region...
Date Thu, 08 Mar 2012 22:55:50 GMT
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java?rev=1298641&r1=1298640&r2=1298641&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Thu Mar  8 22:55:49 2012
@@ -29,17 +29,23 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.zip.Checksum;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.regionserver.MemStore;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.CompoundBloomFilter;
+import org.apache.hadoop.hbase.util.ChecksumFactory;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.IOUtils;
@@ -66,6 +72,12 @@ import com.google.common.base.Preconditi
  * <li>Uncompressed block size, header not included (4 bytes)
  * <li>The offset of the previous block of the same type (8 bytes). This is
  * used to be able to navigate to the previous block without going to the block
+ * <li>For minorVersions >=1, there is an additional 4 byte field 
+ * bytesPerChecksum that records the number of bytes in a checksum chunk.
+ * <li>For minorVersions >=1, there is a 4 byte value to store the size of
+ * data on disk (excluding the checksums)
+ * <li>For minorVersions >=1, a series of 4 byte checksums, one each for
+ * the number of bytes specified by bytesPerChecksum.
  * index.
  * <li>Compressed data (or uncompressed data if compression is disabled). The
  * compression algorithm is the same for all the blocks in the {@link HFile},
@@ -78,12 +90,32 @@ import com.google.common.base.Preconditi
 @InterfaceAudience.Private
 public class HFileBlock extends SchemaConfigured implements Cacheable {
 
+  /** Minor versions starting with this number have hbase checksums */
+  static final int MINOR_VERSION_WITH_CHECKSUM = 1;
+
+  /** minor version that does not support checksums */
+  static final int MINOR_VERSION_NO_CHECKSUM = 0;
+
+  /**
+   * On a checksum failure on a Reader, these many suceeding read
+   * requests switch back to using hdfs checksums before auto-reenabling
+   * hbase checksum verification.
+   */
+  static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
+
+  /** The size data structures with minor version is 0 */
+  static final int HEADER_SIZE_NO_CHECKSUM = MAGIC_LENGTH + 2 * Bytes.SIZEOF_INT
+      + Bytes.SIZEOF_LONG;
+
   public static final boolean FILL_HEADER = true;
   public static final boolean DONT_FILL_HEADER = false;
 
-  /** The size of a version 2 {@link HFile} block header */
-  public static final int HEADER_SIZE = MAGIC_LENGTH + 2 * Bytes.SIZEOF_INT
-      + Bytes.SIZEOF_LONG;
+  /** The size of a version 2 {@link HFile} block header, minor version 1.
+   * There is a 1 byte checksum type, followed by a 4 byte bytesPerChecksum
+   * followed by another 4 byte value to store sizeofDataOnDisk.
+   */
+  static final int HEADER_SIZE = HEADER_SIZE_NO_CHECKSUM + Bytes.SIZEOF_BYTE +
+                                 2 * Bytes.SIZEOF_INT;
 
   /**
    * The size of block header when blockType is {@link BlockType#ENCODED_DATA}.
@@ -93,7 +125,9 @@ public class HFileBlock extends SchemaCo
       + DataBlockEncoding.ID_SIZE;
 
   /** Just an array of bytes of the right size. */
-  public static final byte[] DUMMY_HEADER = new byte[HEADER_SIZE];
+  static final byte[] DUMMY_HEADER = new byte[HEADER_SIZE];
+  static final byte[] DUMMY_HEADER_NO_CHECKSUM = 
+     new byte[HEADER_SIZE_NO_CHECKSUM];
 
   public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
       ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
@@ -101,6 +135,11 @@ public class HFileBlock extends SchemaCo
   static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_LONG +
       Bytes.SIZEOF_INT;
 
+  /**
+   * Each checksum value is an integer that can be stored in 4 bytes.
+   */
+  static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
+
   private static final CacheableDeserializer<Cacheable> blockDeserializer =
       new CacheableDeserializer<Cacheable>() {
         public HFileBlock deserialize(ByteBuffer buf) throws IOException{
@@ -109,7 +148,8 @@ public class HFileBlock extends SchemaCo
           buf.limit(buf.limit()
               - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
           newByteBuffer.put(buf);
-          HFileBlock ourBuffer = new HFileBlock(newByteBuffer);
+          HFileBlock ourBuffer = new HFileBlock(newByteBuffer, 
+                                   MINOR_VERSION_NO_CHECKSUM);
 
           buf.position(buf.limit());
           buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
@@ -120,10 +160,32 @@ public class HFileBlock extends SchemaCo
       };
 
   private BlockType blockType;
+
+  /** Size on disk without the header. It includes checksum data too. */
   private int onDiskSizeWithoutHeader;
+
+  /** Size of pure data. Does not include header or checksums */
   private final int uncompressedSizeWithoutHeader;
+
+  /** The offset of the previous block on disk */
   private final long prevBlockOffset;
+
+  /** The Type of checksum, better to store the byte than an object */
+  private final byte checksumType;
+
+  /** The number of bytes for which a checksum is computed */
+  private final int bytesPerChecksum;
+
+  /** Size on disk of header and data. Does not include checksum data */
+  private final int onDiskDataSizeWithHeader;
+
+  /** The minor version of the hfile. */
+  private final int minorVersion;
+
+  /** The in-memory representation of the hfile block */
   private ByteBuffer buf;
+
+  /** Whether there is a memstore timestamp after every key/value */
   private boolean includesMemstoreTS;
 
   /**
@@ -142,7 +204,7 @@ public class HFileBlock extends SchemaCo
   /**
    * Creates a new {@link HFile} block from the given fields. This constructor
    * is mostly used when the block data has already been read and uncompressed,
-   * and is sitting in a byte buffer.
+   * and is sitting in a byte buffer. 
    *
    * @param blockType the type of this block, see {@link BlockType}
    * @param onDiskSizeWithoutHeader compressed size of the block if compression
@@ -157,10 +219,17 @@ public class HFileBlock extends SchemaCo
    * @param fillHeader true to fill in the first {@link #HEADER_SIZE} bytes of
    *          the buffer based on the header fields provided
    * @param offset the file offset the block was read from
+   * @param minorVersion the minor version of this block
+   * @param bytesPerChecksum the number of bytes per checksum chunk
+   * @param checksumType the checksum algorithm to use
+   * @param onDiskDataSizeWithHeader size of header and data on disk not
+   *        including checksum data
    */
-  public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
+  HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
       int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer buf,
-      boolean fillHeader, long offset, boolean includesMemstoreTS) {
+      boolean fillHeader, long offset, boolean includesMemstoreTS, 
+      int minorVersion, int bytesPerChecksum, byte checksumType,
+      int onDiskDataSizeWithHeader) {
     this.blockType = blockType;
     this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
     this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
@@ -170,20 +239,37 @@ public class HFileBlock extends SchemaCo
       overwriteHeader();
     this.offset = offset;
     this.includesMemstoreTS = includesMemstoreTS;
+    this.minorVersion = minorVersion;
+    this.bytesPerChecksum = bytesPerChecksum;
+    this.checksumType = checksumType;
+    this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
   }
 
   /**
    * Creates a block from an existing buffer starting with a header. Rewinds
    * and takes ownership of the buffer. By definition of rewind, ignores the
    * buffer position, but if you slice the buffer beforehand, it will rewind
-   * to that point.
+   * to that point. The reason this has a minorNumber and not a majorNumber is
+   * because majorNumbers indicate the format of a HFile whereas minorNumbers 
+   * indicate the format inside a HFileBlock.
    */
-  private HFileBlock(ByteBuffer b) throws IOException {
+  HFileBlock(ByteBuffer b, int minorVersion) throws IOException {
     b.rewind();
     blockType = BlockType.read(b);
     onDiskSizeWithoutHeader = b.getInt();
     uncompressedSizeWithoutHeader = b.getInt();
     prevBlockOffset = b.getLong();
+    this.minorVersion = minorVersion;
+    if (minorVersion >= MINOR_VERSION_WITH_CHECKSUM) {
+      this.checksumType = b.get();
+      this.bytesPerChecksum = b.getInt();
+      this.onDiskDataSizeWithHeader = b.getInt();
+    } else {
+      this.checksumType = ChecksumType.NULL.getCode();
+      this.bytesPerChecksum = 0;
+      this.onDiskDataSizeWithHeader = onDiskSizeWithoutHeader +
+                                       HEADER_SIZE_NO_CHECKSUM;
+    }
     buf = b;
     buf.rewind();
   }
@@ -198,25 +284,26 @@ public class HFileBlock extends SchemaCo
       throw new IllegalArgumentException("Querying encoder ID of a block " +
           "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
     }
-    return buf.getShort(HEADER_SIZE);
+    return buf.getShort(headerSize());
   }
 
   /**
-   * @return the on-disk size of the block with header size included
+   * @return the on-disk size of the block with header size included. This
+   * includes the header, the data and the checksum data.
    */
-  public int getOnDiskSizeWithHeader() {
-    return onDiskSizeWithoutHeader + HEADER_SIZE;
+  int getOnDiskSizeWithHeader() {
+    return onDiskSizeWithoutHeader + headerSize();
   }
 
   /**
    * Returns the size of the compressed part of the block in case compression
    * is used, or the uncompressed size of the data part otherwise. Header size
-   * is not included.
+   * and checksum data size is not included.
    *
-   * @return the on-disk size of the data part of the block, header not
-   *         included
+   * @return the on-disk size of the data part of the block, header and
+   *         checksum not included. 
    */
-  public int getOnDiskSizeWithoutHeader() {
+  int getOnDiskSizeWithoutHeader() {
     return onDiskSizeWithoutHeader;
   }
 
@@ -224,7 +311,7 @@ public class HFileBlock extends SchemaCo
    * @return the uncompressed size of the data part of the block, header not
    *         included
    */
-  public int getUncompressedSizeWithoutHeader() {
+   public int getUncompressedSizeWithoutHeader() {
     return uncompressedSizeWithoutHeader;
   }
 
@@ -251,25 +338,27 @@ public class HFileBlock extends SchemaCo
   /**
    * Returns a buffer that does not include the header. The array offset points
    * to the start of the block data right after the header. The underlying data
-   * array is not copied.
+   * array is not copied. Checksum data is not included in the returned buffer.
    *
    * @return the buffer with header skipped
    */
-  public ByteBuffer getBufferWithoutHeader() {
-    return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + HEADER_SIZE,
-        buf.limit() - HEADER_SIZE).slice();
+  ByteBuffer getBufferWithoutHeader() {
+    return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + headerSize(),
+        buf.limit() - headerSize() - totalChecksumBytes()).slice();
   }
 
   /**
    * Returns the buffer this block stores internally. The clients must not
    * modify the buffer object. This method has to be public because it is
    * used in {@link CompoundBloomFilter} to avoid object creation on every
-   * Bloom filter lookup, but has to be used with caution.
+   * Bloom filter lookup, but has to be used with caution. Checksum data
+   * is not included in the returned buffer.
    *
    * @return the buffer of this block for read-only operations
    */
   public ByteBuffer getBufferReadOnly() {
-    return buf;
+    return ByteBuffer.wrap(buf.array(), buf.arrayOffset(),
+        buf.limit() - totalChecksumBytes()).slice();
   }
 
   /**
@@ -278,7 +367,7 @@ public class HFileBlock extends SchemaCo
    *
    * @return the byte buffer with header included
    */
-  public ByteBuffer getBufferWithHeader() {
+  ByteBuffer getBufferWithHeader() {
     ByteBuffer dupBuf = buf.duplicate();
     dupBuf.rewind();
     return dupBuf;
@@ -288,11 +377,11 @@ public class HFileBlock extends SchemaCo
    * Deserializes fields of the given writable using the data portion of this
    * block. Does not check that all the block data has been read.
    */
-  public void readInto(Writable w) throws IOException {
+  void readInto(Writable w) throws IOException {
     Preconditions.checkNotNull(w);
 
-    if (Writables.getWritable(buf.array(), buf.arrayOffset() + HEADER_SIZE,
-        buf.limit() - HEADER_SIZE, w) == null) {
+    if (Writables.getWritable(buf.array(), buf.arrayOffset() + headerSize(),
+        buf.limit() - headerSize(), w) == null) {
       throw new IOException("Failed to deserialize block " + this + " into a "
           + w.getClass().getSimpleName());
     }
@@ -330,8 +419,17 @@ public class HFileBlock extends SchemaCo
         "uncompressedSizeWithoutHeader");
 
     sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset");
+    if (minorVersion >= MINOR_VERSION_WITH_CHECKSUM) {
+      sanityCheckAssertion(buf.get(), checksumType, "checksumType");
+      sanityCheckAssertion(buf.getInt(), bytesPerChecksum, "bytesPerChecksum");
+      sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, 
+                           "onDiskDataSizeWithHeader");
+    }
 
-    int expectedBufLimit = uncompressedSizeWithoutHeader + HEADER_SIZE;
+    int cksumBytes = totalChecksumBytes();
+    int hdrSize = headerSize();
+    int expectedBufLimit = uncompressedSizeWithoutHeader + headerSize() +
+                           cksumBytes;
     if (buf.limit() != expectedBufLimit) {
       throw new AssertionError("Expected buffer limit " + expectedBufLimit
           + ", got " + buf.limit());
@@ -339,11 +437,11 @@ public class HFileBlock extends SchemaCo
 
     // We might optionally allocate HEADER_SIZE more bytes to read the next
     // block's, header, so there are two sensible values for buffer capacity.
-    if (buf.capacity() != uncompressedSizeWithoutHeader + HEADER_SIZE &&
-        buf.capacity() != uncompressedSizeWithoutHeader + 2 * HEADER_SIZE) {
+    int size = uncompressedSizeWithoutHeader + hdrSize + cksumBytes;
+    if (buf.capacity() != size &&
+        buf.capacity() != size + hdrSize) {
       throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
-          ", expected " + (uncompressedSizeWithoutHeader + HEADER_SIZE) +
-          " or " + (uncompressedSizeWithoutHeader + 2 * HEADER_SIZE));
+          ", expected " + size + " or " + (size + hdrSize));
     }
   }
 
@@ -358,8 +456,8 @@ public class HFileBlock extends SchemaCo
         + ", prevBlockOffset="
         + prevBlockOffset
         + ", dataBeginsWith="
-        + Bytes.toStringBinary(buf.array(), buf.arrayOffset() + HEADER_SIZE,
-            Math.min(32, buf.limit() - buf.arrayOffset() - HEADER_SIZE))
+        + Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
+            Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()))
         + ", fileOffset=" + offset;
   }
 
@@ -379,31 +477,36 @@ public class HFileBlock extends SchemaCo
 
   /**
    * Always allocates a new buffer of the correct size. Copies header bytes
-   * from the existing buffer. Does not change header fields.
+   * from the existing buffer. Does not change header fields. 
+   * Reserve room to keep checksum bytes too.
    *
    * @param extraBytes whether to reserve room in the buffer to read the next
    *          block's header
    */
   private void allocateBuffer(boolean extraBytes) {
-    int capacityNeeded = HEADER_SIZE + uncompressedSizeWithoutHeader +
-        (extraBytes ? HEADER_SIZE : 0);
+    int cksumBytes = totalChecksumBytes();
+    int capacityNeeded = headerSize() + uncompressedSizeWithoutHeader +
+        cksumBytes +
+        (extraBytes ? headerSize() : 0);
 
     ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
 
     // Copy header bytes.
     System.arraycopy(buf.array(), buf.arrayOffset(), newBuf.array(),
-        newBuf.arrayOffset(), HEADER_SIZE);
+        newBuf.arrayOffset(), headerSize());
 
     buf = newBuf;
-    buf.limit(HEADER_SIZE + uncompressedSizeWithoutHeader);
+    buf.limit(headerSize() + uncompressedSizeWithoutHeader + cksumBytes);
   }
 
   /** An additional sanity-check in case no compression is being used. */
   public void assumeUncompressed() throws IOException {
-    if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader) {
+    if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + 
+        totalChecksumBytes()) {
       throw new IOException("Using no compression but "
           + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
-          + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader);
+          + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
+          + ", numChecksumbytes=" + totalChecksumBytes());
     }
   }
 
@@ -432,7 +535,7 @@ public class HFileBlock extends SchemaCo
    */
   public DataInputStream getByteStream() {
     return new DataInputStream(new ByteArrayInputStream(buf.array(),
-        buf.arrayOffset() + HEADER_SIZE, buf.limit() - HEADER_SIZE));
+        buf.arrayOffset() + headerSize(), buf.limit() - headerSize()));
   }
 
   @Override
@@ -443,7 +546,10 @@ public class HFileBlock extends SchemaCo
         // Block type and byte buffer references
         2 * ClassSize.REFERENCE +
         // On-disk size, uncompressed size, and next block's on-disk size
-        3 * Bytes.SIZEOF_INT +
+        // bytePerChecksum,  onDiskDataSize and minorVersion
+        6 * Bytes.SIZEOF_INT +
+        // Checksum type
+        1 * Bytes.SIZEOF_BYTE +
         // This and previous block offset
         2 * Bytes.SIZEOF_LONG +
         // "Include memstore timestamp" flag
@@ -570,14 +676,30 @@ public class HFileBlock extends SchemaCo
 
     /**
      * Bytes to be written to the file system, including the header. Compressed
-     * if compression is turned on.
+     * if compression is turned on. It also includes the checksum data that 
+     * immediately follows the block data. (header + data + checksums)
      */
     private byte[] onDiskBytesWithHeader;
 
     /**
+     * The size of the data on disk that does not include the checksums.
+     * (header + data)
+     */
+    private int onDiskDataSizeWithHeader;
+
+    /**
+     * The size of the checksum data on disk. It is used only if data is
+     * not compressed. If data is compressed, then the checksums are already
+     * part of onDiskBytesWithHeader. If data is uncompressed, then this
+     * variable stores the checksum data for this block.
+     */
+    private byte[] onDiskChecksum;
+
+    /**
      * Valid in the READY state. Contains the header and the uncompressed (but
      * potentially encoded, if this is a data block) bytes, so the length is
      * {@link #uncompressedSizeWithoutHeader} + {@link HFileBlock#HEADER_SIZE}.
+     * Does not store checksums.
      */
     private byte[] uncompressedBytesWithHeader;
 
@@ -599,12 +721,19 @@ public class HFileBlock extends SchemaCo
     /** Whether we are including memstore timestamp after every key/value */
     private boolean includesMemstoreTS;
 
+    /** Checksum settings */
+    private ChecksumType checksumType;
+    private int bytesPerChecksum;
+
     /**
      * @param compressionAlgorithm compression algorithm to use
      * @param dataBlockEncoderAlgo data block encoding algorithm to use
+     * @param checksumType type of checksum
+     * @param bytesPerChecksum bytes per checksum
      */
     public Writer(Compression.Algorithm compressionAlgorithm,
-          HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS) {
+          HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS,
+          ChecksumType checksumType, int bytesPerChecksum) {
       compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm;
       this.dataBlockEncoder = dataBlockEncoder != null
           ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
@@ -622,12 +751,19 @@ public class HFileBlock extends SchemaCo
               "for algorithm " + compressionAlgorithm, e);
         }
       }
+      if (bytesPerChecksum < HEADER_SIZE) {
+        throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
+            " Minimum is " + HEADER_SIZE + " but the configured value is " +
+            bytesPerChecksum);
+      }
       
       prevOffsetByType = new long[BlockType.values().length];
       for (int i = 0; i < prevOffsetByType.length; ++i)
         prevOffsetByType[i] = -1;
 
       this.includesMemstoreTS = includesMemstoreTS;
+      this.checksumType = checksumType;
+      this.bytesPerChecksum = bytesPerChecksum;
     }
 
     /**
@@ -703,16 +839,18 @@ public class HFileBlock extends SchemaCo
       state = State.BLOCK_READY;
       encodeDataBlockForDisk();
 
-      doCompression();
-      putHeader(uncompressedBytesWithHeader, 0, onDiskBytesWithHeader.length,
-          uncompressedBytesWithHeader.length);
+      doCompressionAndChecksumming();
     }
 
     /**
      * Do compression if it is enabled, or re-use the uncompressed buffer if
      * it is not. Fills in the compressed block's header if doing compression.
+     * Also, compute the checksums. In the case of no-compression, write the
+     * checksums to its own seperate data structure called onDiskChecksum. In
+     * the case when compression is enabled, the checksums are written to the
+     * outputbyte stream 'baos'.
      */
-    private void doCompression() throws IOException {
+    private void doCompressionAndChecksumming() throws IOException {
       // do the compression
       if (compressAlgo != NONE) {
         compressedByteStream.reset();
@@ -726,11 +864,53 @@ public class HFileBlock extends SchemaCo
         compressionStream.flush();
         compressionStream.finish();
 
+        // generate checksums
+        onDiskDataSizeWithHeader = compressedByteStream.size(); // data size
+
+        // reserve space for checksums in the output byte stream
+        ChecksumUtil.reserveSpaceForChecksums(compressedByteStream, 
+          onDiskDataSizeWithHeader, bytesPerChecksum);
+
+
         onDiskBytesWithHeader = compressedByteStream.toByteArray();
         putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
-            uncompressedBytesWithHeader.length);
+            uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
+
+       // generate checksums for header and data. The checksums are
+       // part of onDiskBytesWithHeader itself.
+       ChecksumUtil.generateChecksums(
+         onDiskBytesWithHeader, 0, onDiskDataSizeWithHeader,
+         onDiskBytesWithHeader, onDiskDataSizeWithHeader,
+         checksumType, bytesPerChecksum);
+
+        // Checksums are already part of onDiskBytesWithHeader
+        onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
+
+        //set the header for the uncompressed bytes (for cache-on-write)
+        putHeader(uncompressedBytesWithHeader, 0,
+          onDiskBytesWithHeader.length + onDiskChecksum.length,
+          uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
+
       } else {
+        // If we are not using any compression, then the
+        // checksums are written to its own array onDiskChecksum.
         onDiskBytesWithHeader = uncompressedBytesWithHeader;
+
+        onDiskDataSizeWithHeader = onDiskBytesWithHeader.length;
+        int numBytes = (int)ChecksumUtil.numBytes(
+                          uncompressedBytesWithHeader.length,
+                          bytesPerChecksum);
+        onDiskChecksum = new byte[numBytes];
+
+        //set the header for the uncompressed bytes
+        putHeader(uncompressedBytesWithHeader, 0,
+          onDiskBytesWithHeader.length + onDiskChecksum.length,
+          uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
+
+        ChecksumUtil.generateChecksums(
+          uncompressedBytesWithHeader, 0, uncompressedBytesWithHeader.length,
+          onDiskChecksum, 0,
+          checksumType, bytesPerChecksum);
       }
     }
 
@@ -749,7 +929,7 @@ public class HFileBlock extends SchemaCo
           HEADER_SIZE).slice();
       Pair<ByteBuffer, BlockType> encodingResult =
           dataBlockEncoder.beforeWriteToDisk(rawKeyValues,
-              includesMemstoreTS);
+              includesMemstoreTS, DUMMY_HEADER);
 
       BlockType encodedBlockType = encodingResult.getSecond();
       if (encodedBlockType == BlockType.ENCODED_DATA) {
@@ -772,16 +952,21 @@ public class HFileBlock extends SchemaCo
 
     /**
      * Put the header into the given byte array at the given offset.
-     * @param onDiskSize size of the block on disk
+     * @param onDiskSize size of the block on disk header + data + checksum
      * @param uncompressedSize size of the block after decompression (but
-     *          before optional data block decoding)
+     *          before optional data block decoding) including header
+     * @param onDiskDataSize size of the block on disk with header
+     *        and data but not including the checksums
      */
     private void putHeader(byte[] dest, int offset, int onDiskSize,
-        int uncompressedSize) {
+        int uncompressedSize, int onDiskDataSize) {
       offset = blockType.put(dest, offset);
       offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE);
       offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE);
-      Bytes.putLong(dest, offset, prevOffset);
+      offset = Bytes.putLong(dest, offset, prevOffset);
+      offset = Bytes.putByte(dest, offset, checksumType.getCode());
+      offset = Bytes.putInt(dest, offset, bytesPerChecksum);
+      offset = Bytes.putInt(dest, offset, onDiskDataSizeWithHeader);
     }
 
     /**
@@ -816,19 +1001,45 @@ public class HFileBlock extends SchemaCo
     private void writeHeaderAndData(DataOutputStream out) throws IOException {
       ensureBlockReady();
       out.write(onDiskBytesWithHeader);
+      if (compressAlgo == NONE) {
+        if (onDiskChecksum == HConstants.EMPTY_BYTE_ARRAY) {
+          throw new IOException("A " + blockType 
+              + " without compression should have checksums " 
+              + " stored separately.");
+        }
+        out.write(onDiskChecksum);
+      }
     }
 
     /**
      * Returns the header or the compressed data (or uncompressed data when not
      * using compression) as a byte array. Can be called in the "writing" state
      * or in the "block ready" state. If called in the "writing" state,
-     * transitions the writer to the "block ready" state.
+     * transitions the writer to the "block ready" state. This returns
+     * the header + data + checksums stored on disk.
      *
      * @return header and data as they would be stored on disk in a byte array
      * @throws IOException
      */
-    public byte[] getHeaderAndData() throws IOException {
+    byte[] getHeaderAndDataForTest() throws IOException {
       ensureBlockReady();
+      if (compressAlgo == NONE) {
+        if (onDiskChecksum == HConstants.EMPTY_BYTE_ARRAY) {
+          throw new IOException("A " + blockType 
+              + " without compression should have checksums " 
+              + " stored separately.");
+        }
+        // This is not very optimal, because we are doing an extra copy.
+        // But this method is used only by unit tests.
+        byte[] output = new byte[onDiskBytesWithHeader.length +
+                                 onDiskChecksum.length];
+        System.arraycopy(onDiskBytesWithHeader, 0,
+                         output, 0, onDiskBytesWithHeader.length);
+        System.arraycopy(onDiskChecksum, 0,
+                         output, onDiskBytesWithHeader.length,
+                         onDiskChecksum.length);
+        return output;
+      }
       return onDiskBytesWithHeader;
     }
 
@@ -851,9 +1062,9 @@ public class HFileBlock extends SchemaCo
      *
      * @return the on-disk size of the block, not including the header.
      */
-    public int getOnDiskSizeWithoutHeader() {
+    int getOnDiskSizeWithoutHeader() {
       expectState(State.BLOCK_READY);
-      return onDiskBytesWithHeader.length - HEADER_SIZE;
+      return onDiskBytesWithHeader.length + onDiskChecksum.length - HEADER_SIZE;
     }
 
     /**
@@ -861,17 +1072,17 @@ public class HFileBlock extends SchemaCo
      * "block ready" state.
      *
      * @return the on-disk size of the block ready to be written, including the
-     *         header size
+     *         header size, the data and the checksum data.
      */
-    public int getOnDiskSizeWithHeader() {
+    int getOnDiskSizeWithHeader() {
       expectState(State.BLOCK_READY);
-      return onDiskBytesWithHeader.length;
+      return onDiskBytesWithHeader.length + onDiskChecksum.length;
     }
 
     /**
      * The uncompressed size of the block data. Does not include header size.
      */
-    public int getUncompressedSizeWithoutHeader() {
+    int getUncompressedSizeWithoutHeader() {
       expectState(State.BLOCK_READY);
       return uncompressedBytesWithHeader.length - HEADER_SIZE;
     }
@@ -879,7 +1090,7 @@ public class HFileBlock extends SchemaCo
     /**
      * The uncompressed size of the block data, including header size.
      */
-    public int getUncompressedSizeWithHeader() {
+    int getUncompressedSizeWithHeader() {
       expectState(State.BLOCK_READY);
       return uncompressedBytesWithHeader.length;
     }
@@ -906,13 +1117,13 @@ public class HFileBlock extends SchemaCo
      * Returns the header followed by the uncompressed data, even if using
      * compression. This is needed for storing uncompressed blocks in the block
      * cache. Can be called in the "writing" state or the "block ready" state.
+     * Returns only the header and data, does not include checksum data.
      *
      * @return uncompressed block bytes for caching on write
      */
-    private byte[] getUncompressedDataWithHeader() {
+    ByteBuffer getUncompressedBufferWithHeader() {
       expectState(State.BLOCK_READY);
-
-      return uncompressedBytesWithHeader;
+      return ByteBuffer.wrap(uncompressedBytesWithHeader);
     }
 
     private void expectState(State expectedState) {
@@ -923,17 +1134,6 @@ public class HFileBlock extends SchemaCo
     }
 
     /**
-     * Similar to {@link #getUncompressedBufferWithHeader()} but returns a byte
-     * buffer.
-     *
-     * @return uncompressed block for caching on write in the form of a buffer
-     */
-    public ByteBuffer getUncompressedBufferWithHeader() {
-      byte[] b = getUncompressedDataWithHeader();
-      return ByteBuffer.wrap(b, 0, b.length);
-    }
-
-    /**
      * Takes the given {@link BlockWritable} instance, creates a new block of
      * its appropriate type, writes the writable into this block, and flushes
      * the block into the output stream. The writer is instructed not to buffer
@@ -949,13 +1149,21 @@ public class HFileBlock extends SchemaCo
       writeHeaderAndData(out);
     }
 
+    /**
+     * Creates a new HFileBlock. Checksums have already been validated, so
+     * the byte buffer passed into the constructor of this newly created
+     * block does not have checksum data even though the header minor 
+     * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
+     * 0 value in bytesPerChecksum.
+     */
     public HFileBlock getBlockForCaching() {
       return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
           getUncompressedSizeWithoutHeader(), prevOffset,
           getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset,
-          includesMemstoreTS);
+          includesMemstoreTS, MINOR_VERSION_WITH_CHECKSUM,
+          0, ChecksumType.NULL.getCode(),  // no checksums in cached data
+          onDiskBytesWithHeader.length + onDiskChecksum.length);
     }
-
   }
 
   /** Something that can be written into a block. */
@@ -1024,10 +1232,15 @@ public class HFileBlock extends SchemaCo
    * A common implementation of some methods of {@link FSReader} and some
    * tools for implementing HFile format version-specific block readers.
    */
-  public abstract static class AbstractFSReader implements FSReader {
+  private abstract static class AbstractFSReader implements FSReader {
 
-    /** The file system stream of the underlying {@link HFile} */
-    protected FSDataInputStream istream;
+    /** The file system stream of the underlying {@link HFile} that 
+     * does checksum validations in the filesystem */
+    protected final FSDataInputStream istream;
+
+    /** The file system stream of the underlying {@link HFile} that
+     * does not do checksum verification in the file system */
+    protected final FSDataInputStream istreamNoFsChecksum;
 
     /** Compression algorithm used by the {@link HFile} */
     protected Compression.Algorithm compressAlgo;
@@ -1035,14 +1248,34 @@ public class HFileBlock extends SchemaCo
     /** The size of the file we are reading from, or -1 if unknown. */
     protected long fileSize;
 
+    /** The minor version of this reader */
+    private int minorVersion;
+
+    /** The size of the header */
+    protected int hdrSize;
+
+    /** The filesystem used to access data */
+    protected HFileSystem hfs;
+
+    /** The path (if any) where this data is coming from */
+    protected Path path;
+
     /** The default buffer size for our buffered streams */
     public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
 
-    public AbstractFSReader(FSDataInputStream istream, Algorithm compressAlgo,
-        long fileSize) {
+    public AbstractFSReader(FSDataInputStream istream, 
+        FSDataInputStream istreamNoFsChecksum,
+        Algorithm compressAlgo,
+        long fileSize, int minorVersion, HFileSystem hfs, Path path) 
+        throws IOException {
       this.istream = istream;
       this.compressAlgo = compressAlgo;
       this.fileSize = fileSize;
+      this.minorVersion = minorVersion;
+      this.hfs = hfs;
+      this.path = path;
+      this.hdrSize = headerSize(minorVersion);
+      this.istreamNoFsChecksum = istreamNoFsChecksum;
     }
 
     @Override
@@ -1083,25 +1316,27 @@ public class HFileBlock extends SchemaCo
      * @param peekIntoNextBlock whether to read the next block's on-disk size
      * @param fileOffset position in the stream to read at
      * @param pread whether we should do a positional read
+     * @param istream The input source of data
      * @return the on-disk size of the next block with header size included, or
      *         -1 if it could not be determined
      * @throws IOException
      */
-    protected int readAtOffset(byte[] dest, int destOffset, int size,
+    protected int readAtOffset(FSDataInputStream istream,
+        byte[] dest, int destOffset, int size,
         boolean peekIntoNextBlock, long fileOffset, boolean pread)
         throws IOException {
       if (peekIntoNextBlock &&
-          destOffset + size + HEADER_SIZE > dest.length) {
+          destOffset + size + hdrSize > dest.length) {
         // We are asked to read the next block's header as well, but there is
         // not enough room in the array.
         throw new IOException("Attempted to read " + size + " bytes and " +
-            HEADER_SIZE + " bytes of next header into a " + dest.length +
+            hdrSize + " bytes of next header into a " + dest.length +
             "-byte array at offset " + destOffset);
       }
 
       if (pread) {
         // Positional read. Better for random reads.
-        int extraSize = peekIntoNextBlock ? HEADER_SIZE : 0;
+        int extraSize = peekIntoNextBlock ? hdrSize : 0;
 
         int ret = istream.read(fileOffset, dest, destOffset, size + extraSize);
         if (ret < size) {
@@ -1131,14 +1366,14 @@ public class HFileBlock extends SchemaCo
           }
 
           // Try to read the next block header.
-          if (!readWithExtra(istream, dest, destOffset, size, HEADER_SIZE))
+          if (!readWithExtra(istream, dest, destOffset, size, hdrSize))
             return -1;
         }
       }
 
       assert peekIntoNextBlock;
       return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) +
-          HEADER_SIZE;
+          hdrSize;
     }
 
     /**
@@ -1149,14 +1384,12 @@ public class HFileBlock extends SchemaCo
      * @param bufferedBoundedStream
      *          a stream to read compressed data from, bounded to the exact
      *          amount of compressed data
-     * @param compressedSize
-     *          compressed data size, header not included
      * @param uncompressedSize
      *          uncompressed data size, header not included
      * @throws IOException
      */
     protected void decompress(byte[] dest, int destOffset,
-        InputStream bufferedBoundedStream, int compressedSize,
+        InputStream bufferedBoundedStream,
         int uncompressedSize) throws IOException {
       Decompressor decompressor = null;
       try {
@@ -1189,6 +1422,12 @@ public class HFileBlock extends SchemaCo
           offset, size, pread), Math.min(DEFAULT_BUFFER_SIZE, size));
     }
 
+    /**
+     * @return The minorVersion of this HFile
+     */
+    protected int getMinorVersion() {
+      return minorVersion;
+    }
   }
 
   /**
@@ -1198,14 +1437,15 @@ public class HFileBlock extends SchemaCo
    * reader returns blocks represented in the uniform version 2 format in
    * memory.
    */
-  public static class FSReaderV1 extends AbstractFSReader {
+  static class FSReaderV1 extends AbstractFSReader {
 
     /** Header size difference between version 1 and 2 */
-    private static final int HEADER_DELTA = HEADER_SIZE - MAGIC_LENGTH;
+    private static final int HEADER_DELTA = HEADER_SIZE_NO_CHECKSUM - 
+                                            MAGIC_LENGTH;
 
     public FSReaderV1(FSDataInputStream istream, Algorithm compressAlgo,
-        long fileSize) {
-      super(istream, compressAlgo, fileSize);
+        long fileSize) throws IOException {
+      super(istream, istream, compressAlgo, fileSize, 0, null, null);
     }
 
     /**
@@ -1264,7 +1504,7 @@ public class HFileBlock extends SchemaCo
 
         // The first MAGIC_LENGTH bytes of what this will read will be
         // overwritten.
-        readAtOffset(buf.array(), buf.arrayOffset() + HEADER_DELTA,
+        readAtOffset(istream, buf.array(), buf.arrayOffset() + HEADER_DELTA,
             onDiskSize, false, offset, pread);
 
         onDiskSizeWithoutHeader = uncompressedSizeWithMagic - MAGIC_LENGTH;
@@ -1272,7 +1512,7 @@ public class HFileBlock extends SchemaCo
         InputStream bufferedBoundedStream = createBufferedBoundedStream(
             offset, onDiskSize, pread);
         decompress(buf.array(), buf.arrayOffset() + HEADER_DELTA,
-            bufferedBoundedStream, onDiskSize, uncompressedSizeWithMagic);
+            bufferedBoundedStream, uncompressedSizeWithMagic);
 
         // We don't really have a good way to exclude the "magic record" size
         // from the compressed block's size, since it is compressed as well.
@@ -1287,7 +1527,8 @@ public class HFileBlock extends SchemaCo
       // since the magic record gets moved to the header.
       HFileBlock b = new HFileBlock(newBlockType, onDiskSizeWithoutHeader,
           uncompressedSizeWithMagic - MAGIC_LENGTH, -1L, buf, FILL_HEADER,
-          offset, MemStore.NO_PERSISTENT_TS);
+          offset, MemStore.NO_PERSISTENT_TS, 0, 0, ChecksumType.NULL.getCode(),
+          onDiskSizeWithoutHeader + HEADER_SIZE_NO_CHECKSUM);
       return b;
     }
   }
@@ -1303,7 +1544,20 @@ public class HFileBlock extends SchemaCo
   }
 
   /** Reads version 2 blocks from the filesystem. */
-  public static class FSReaderV2 extends AbstractFSReader {
+  static class FSReaderV2 extends AbstractFSReader {
+
+    // The configuration states that we should validate hbase checksums
+    private final boolean useHBaseChecksumConfigured;
+
+    // Record the current state of this reader with respect to
+    // validating checkums in HBase. This is originally set the same
+    // value as useHBaseChecksumConfigured, but can change state as and when
+    // we encounter checksum verification failures.
+    private volatile boolean useHBaseChecksum;
+
+    // In the case of a checksum failure, do these many succeeding
+    // reads without hbase checksum verification.
+    private volatile int checksumOffCount = -1;
 
     /** Whether we include memstore timestamp in data blocks */
     protected boolean includesMemstoreTS;
@@ -1320,9 +1574,40 @@ public class HFileBlock extends SchemaCo
           }
         };
 
-    public FSReaderV2(FSDataInputStream istream, Algorithm compressAlgo,
-        long fileSize) {
-      super(istream, compressAlgo, fileSize);
+    public FSReaderV2(FSDataInputStream istream, 
+        FSDataInputStream istreamNoFsChecksum, Algorithm compressAlgo,
+        long fileSize, int minorVersion, HFileSystem hfs, Path path) 
+      throws IOException {
+      super(istream, istreamNoFsChecksum, compressAlgo, fileSize, 
+            minorVersion, hfs, path);
+
+      if (hfs != null) {
+        // Check the configuration to determine whether hbase-level
+        // checksum verification is needed or not.
+        useHBaseChecksum = hfs.useHBaseChecksum();
+      } else {
+        // The configuration does not specify anything about hbase checksum
+        // validations. Set it to true here assuming that we will verify
+        // hbase checksums for all reads. For older files that do not have 
+        // stored checksums, this flag will be reset later.
+        useHBaseChecksum = true;
+      }
+
+      // for older versions, hbase did not store checksums.
+      if (getMinorVersion() < MINOR_VERSION_WITH_CHECKSUM) {
+        useHBaseChecksum = false;
+      }
+      this.useHBaseChecksumConfigured = useHBaseChecksum;
+    }
+
+    /**
+     * A constructor that reads files with the latest minor version.
+     * This is used by unit tests only.
+     */
+    FSReaderV2(FSDataInputStream istream, Algorithm compressAlgo,
+        long fileSize) throws IOException {
+      this(istream, istream, compressAlgo, fileSize, 
+           HFileReaderV2.MAX_MINOR_VERSION, null, null);
     }
 
     /**
@@ -1339,6 +1624,101 @@ public class HFileBlock extends SchemaCo
     @Override
     public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
         int uncompressedSize, boolean pread) throws IOException {
+
+      // It is ok to get a reference to the stream here without any
+      // locks because it is marked final.
+      FSDataInputStream is = this.istreamNoFsChecksum;
+
+      // get a copy of the current state of whether to validate
+      // hbase checksums or not for this read call. This is not 
+      // thread-safe but the one constaint is that if we decide 
+      // to skip hbase checksum verification then we are 
+      // guaranteed to use hdfs checksum verification.
+      boolean doVerificationThruHBaseChecksum = this.useHBaseChecksum;
+      if (!doVerificationThruHBaseChecksum) {
+        is = this.istream;
+      }
+                     
+      HFileBlock blk = readBlockDataInternal(is, offset, 
+                         onDiskSizeWithHeaderL, 
+                         uncompressedSize, pread,
+                         doVerificationThruHBaseChecksum);
+      if (blk == null) {
+        HFile.LOG.warn("HBase checksum verification failed for file " +
+                       path + " at offset " +
+                       offset + " filesize " + fileSize +
+                       ". Retrying read with HDFS checksums turned on...");
+
+        if (!doVerificationThruHBaseChecksum) {
+          String msg = "HBase checksum verification failed for file " +
+                       path + " at offset " +
+                       offset + " filesize " + fileSize + 
+                       " but this cannot happen because doVerify is " +
+                       doVerificationThruHBaseChecksum;
+          HFile.LOG.warn(msg);
+          throw new IOException(msg); // cannot happen case here
+        }
+        HFile.checksumFailures.incrementAndGet(); // update metrics
+ 
+        // If we have a checksum failure, we fall back into a mode where
+        // the next few reads use HDFS level checksums. We aim to make the
+        // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
+        // hbase checksum verification, but since this value is set without
+        // holding any locks, it can so happen that we might actually do
+        // a few more than precisely this number.
+        this.checksumOffCount = CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD;
+        this.useHBaseChecksum = false;
+        doVerificationThruHBaseChecksum = false;
+        is = this.istream;
+        blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL,
+                                    uncompressedSize, pread,
+                                    doVerificationThruHBaseChecksum);
+        if (blk != null) {
+          HFile.LOG.warn("HDFS checksum verification suceeded for file " +
+                         path + " at offset " +
+                         offset + " filesize " + fileSize);
+        }
+      } 
+      if (blk == null && !doVerificationThruHBaseChecksum) {
+        String msg = "readBlockData failed, possibly due to " +
+                     "checksum verification failed for file " + path +
+                     " at offset " + offset + " filesize " + fileSize;
+        HFile.LOG.warn(msg);
+        throw new IOException(msg);
+      }
+
+      // If there is a checksum mismatch earlier, then retry with 
+      // HBase checksums switched off and use HDFS checksum verification.
+      // This triggers HDFS to detect and fix corrupt replicas. The
+      // next checksumOffCount read requests will use HDFS checksums.
+      // The decrementing of this.checksumOffCount is not thread-safe,
+      // but it is harmless because eventually checksumOffCount will be
+      // a negative number.
+      if (!this.useHBaseChecksum && this.useHBaseChecksumConfigured) {
+        if (this.checksumOffCount-- < 0) {
+          this.useHBaseChecksum = true; // auto re-enable hbase checksums
+        }
+      }
+      return blk;
+    }
+
+    /**
+     * Reads a version 2 block. 
+     *
+     * @param offset the offset in the stream to read at
+     * @param onDiskSizeWithHeaderL the on-disk size of the block, including
+     *          the header, or -1 if unknown
+     * @param uncompressedSize the uncompressed size of the the block. Always
+     *          expected to be -1. This parameter is only used in version 1.
+     * @param pread whether to use a positional read
+     * @param verifyChecksum Whether to use HBase checksums. 
+     *        If HBase checksum is switched off, then use HDFS checksum.
+     * @return the HFileBlock or null if there is a HBase checksum mismatch
+     */
+    private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, 
+        long onDiskSizeWithHeaderL,
+        int uncompressedSize, boolean pread, boolean verifyChecksum) 
+        throws IOException {
       if (offset < 0) {
         throw new IOException("Invalid offset=" + offset + " trying to read "
             + "block (onDiskSize=" + onDiskSizeWithHeaderL
@@ -1349,10 +1729,10 @@ public class HFileBlock extends SchemaCo
             "the uncompressed size parameter");
       }
 
-      if ((onDiskSizeWithHeaderL < HEADER_SIZE && onDiskSizeWithHeaderL != -1)
+      if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
           || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) {
         throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL
-            + ": expected to be at least " + HEADER_SIZE
+            + ": expected to be at least " + hdrSize
             + " and at most " + Integer.MAX_VALUE + ", or -1 (offset="
             + offset + ", uncompressedSize=" + uncompressedSize + ")");
       }
@@ -1369,7 +1749,7 @@ public class HFileBlock extends SchemaCo
         // block's header (e.g. this block's header) when reading the previous
         // block. This is the faster and more preferable case.
 
-        int onDiskSizeWithoutHeader = onDiskSizeWithHeader - HEADER_SIZE;
+        int onDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
         assert onDiskSizeWithoutHeader >= 0;
 
         // See if we can avoid reading the header. This is desirable, because
@@ -1380,39 +1760,42 @@ public class HFileBlock extends SchemaCo
             ? prefetchedHeader.header : null;
 
         // Size that we have to skip in case we have already read the header.
-        int preReadHeaderSize = header == null ? 0 : HEADER_SIZE;
+        int preReadHeaderSize = header == null ? 0 : hdrSize;
 
         if (compressAlgo == Compression.Algorithm.NONE) {
           // Just read the whole thing. Allocate enough space to read the
           // next block's header too.
 
           ByteBuffer headerAndData = ByteBuffer.allocate(onDiskSizeWithHeader
-              + HEADER_SIZE);
+              + hdrSize);
           headerAndData.limit(onDiskSizeWithHeader);
 
           if (header != null) {
             System.arraycopy(header, 0, headerAndData.array(), 0,
-                HEADER_SIZE);
+                hdrSize);
           }
 
-          int nextBlockOnDiskSizeWithHeader = readAtOffset(
+          int nextBlockOnDiskSizeWithHeader = readAtOffset(is,
               headerAndData.array(), headerAndData.arrayOffset()
                   + preReadHeaderSize, onDiskSizeWithHeader
                   - preReadHeaderSize, true, offset + preReadHeaderSize,
                   pread);
 
-          b = new HFileBlock(headerAndData);
+          b = new HFileBlock(headerAndData, getMinorVersion());
           b.assumeUncompressed();
           b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
           b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSizeWithHeader;
-
+          if (verifyChecksum &&
+              !validateBlockChecksum(b, headerAndData.array(), hdrSize)) {
+            return null;             // checksum mismatch
+          }
           if (b.nextBlockOnDiskSizeWithHeader > 0)
             setNextBlockHeader(offset, b);
         } else {
           // Allocate enough space to fit the next block's header too.
-          byte[] onDiskBlock = new byte[onDiskSizeWithHeader + HEADER_SIZE];
+          byte[] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
 
-          int nextBlockOnDiskSize = readAtOffset(onDiskBlock,
+          int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
               preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
               true, offset + preReadHeaderSize, pread);
 
@@ -1420,32 +1803,38 @@ public class HFileBlock extends SchemaCo
             header = onDiskBlock;
 
           try {
-            b = new HFileBlock(ByteBuffer.wrap(header, 0, HEADER_SIZE));
+            b = new HFileBlock(ByteBuffer.wrap(header, 0, hdrSize), 
+                               getMinorVersion());
           } catch (IOException ex) {
             // Seen in load testing. Provide comprehensive debug info.
             throw new IOException("Failed to read compressed block at "
                 + offset + ", onDiskSizeWithoutHeader=" + onDiskSizeWithHeader
                 + ", preReadHeaderSize=" + preReadHeaderSize
                 + ", header.length=" + header.length + ", header bytes: "
-                + Bytes.toStringBinary(header, 0, HEADER_SIZE), ex);
+                + Bytes.toStringBinary(header, 0, hdrSize), ex);
           }
           b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
           b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
+          if (verifyChecksum && 
+              !validateBlockChecksum(b, onDiskBlock, hdrSize)) {
+            return null;             // checksum mismatch
+          }
 
           DataInputStream dis = new DataInputStream(new ByteArrayInputStream(
-              onDiskBlock, HEADER_SIZE, onDiskSizeWithoutHeader));
+              onDiskBlock, hdrSize, onDiskSizeWithoutHeader));
 
           // This will allocate a new buffer but keep header bytes.
           b.allocateBuffer(b.nextBlockOnDiskSizeWithHeader > 0);
 
-          decompress(b.buf.array(), b.buf.arrayOffset() + HEADER_SIZE, dis,
-              onDiskSizeWithoutHeader, b.uncompressedSizeWithoutHeader);
+          decompress(b.buf.array(), b.buf.arrayOffset() + hdrSize, dis,
+              b.uncompressedSizeWithoutHeader);
 
           // Copy next block's header bytes into the new block if we have them.
           if (nextBlockOnDiskSize > 0) {
             System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.array(),
-                b.buf.arrayOffset() + HEADER_SIZE
-                    + b.uncompressedSizeWithoutHeader, HEADER_SIZE);
+                b.buf.arrayOffset() + hdrSize
+                    + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(), 
+                hdrSize);
 
             setNextBlockHeader(offset, b);
           }
@@ -1467,12 +1856,12 @@ public class HFileBlock extends SchemaCo
         if (headerBuf == null) {
           // Unfortunately, we still have to do a separate read operation to
           // read the header.
-          headerBuf = ByteBuffer.allocate(HEADER_SIZE);;
-          readAtOffset(headerBuf.array(), headerBuf.arrayOffset(), HEADER_SIZE,
+          headerBuf = ByteBuffer.allocate(hdrSize);
+          readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize,
               false, offset, pread);
         }
 
-        b = new HFileBlock(headerBuf);
+        b = new HFileBlock(headerBuf, getMinorVersion());
 
         // This will also allocate enough room for the next block's header.
         b.allocateBuffer(true);
@@ -1482,10 +1871,15 @@ public class HFileBlock extends SchemaCo
           // Avoid creating bounded streams and using a "codec" that does
           // nothing.
           b.assumeUncompressed();
-          b.nextBlockOnDiskSizeWithHeader = readAtOffset(b.buf.array(),
-              b.buf.arrayOffset() + HEADER_SIZE,
-              b.uncompressedSizeWithoutHeader, true, offset + HEADER_SIZE,
+          b.nextBlockOnDiskSizeWithHeader = readAtOffset(is, b.buf.array(),
+              b.buf.arrayOffset() + hdrSize,
+              b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(), 
+              true, offset + hdrSize,
               pread);
+          if (verifyChecksum && 
+              !validateBlockChecksum(b, b.buf.array(), hdrSize)) {
+            return null;             // checksum mismatch
+          }
 
           if (b.nextBlockOnDiskSizeWithHeader > 0) {
             setNextBlockHeader(offset, b);
@@ -1493,26 +1887,30 @@ public class HFileBlock extends SchemaCo
         } else {
           // Allocate enough space for the block's header and compressed data.
           byte[] compressedBytes = new byte[b.getOnDiskSizeWithHeader()
-              + HEADER_SIZE];
+              + hdrSize];
 
-          b.nextBlockOnDiskSizeWithHeader = readAtOffset(compressedBytes,
-              HEADER_SIZE, b.onDiskSizeWithoutHeader, true, offset
-                  + HEADER_SIZE, pread);
+          b.nextBlockOnDiskSizeWithHeader = readAtOffset(is, compressedBytes,
+              hdrSize, b.onDiskSizeWithoutHeader, true, offset
+                  + hdrSize, pread);
+          if (verifyChecksum &&
+              !validateBlockChecksum(b, compressedBytes, hdrSize)) {
+            return null;             // checksum mismatch
+          }
           DataInputStream dis = new DataInputStream(new ByteArrayInputStream(
-              compressedBytes, HEADER_SIZE, b.onDiskSizeWithoutHeader));
+              compressedBytes, hdrSize, b.onDiskSizeWithoutHeader));
 
-          decompress(b.buf.array(), b.buf.arrayOffset() + HEADER_SIZE, dis,
-              b.onDiskSizeWithoutHeader, b.uncompressedSizeWithoutHeader);
+          decompress(b.buf.array(), b.buf.arrayOffset() + hdrSize, dis,
+              b.uncompressedSizeWithoutHeader);
 
           if (b.nextBlockOnDiskSizeWithHeader > 0) {
             // Copy the next block's header into the new block.
-            int nextHeaderOffset = b.buf.arrayOffset() + HEADER_SIZE
-                + b.uncompressedSizeWithoutHeader;
+            int nextHeaderOffset = b.buf.arrayOffset() + hdrSize
+                + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes();
             System.arraycopy(compressedBytes,
-                compressedBytes.length - HEADER_SIZE,
+                compressedBytes.length - hdrSize,
                 b.buf.array(),
                 nextHeaderOffset,
-                HEADER_SIZE);
+                hdrSize);
 
             setNextBlockHeader(offset, b);
           }
@@ -1527,10 +1925,10 @@ public class HFileBlock extends SchemaCo
     private void setNextBlockHeader(long offset, HFileBlock b) {
       PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
       prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
-      int nextHeaderOffset = b.buf.arrayOffset() + HEADER_SIZE
-          + b.uncompressedSizeWithoutHeader;
+      int nextHeaderOffset = b.buf.arrayOffset() + hdrSize
+          + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes();
       System.arraycopy(b.buf.array(), nextHeaderOffset,
-          prefetchedHeader.header, 0, HEADER_SIZE);
+          prefetchedHeader.header, 0, hdrSize);
     }
 
     void setIncludesMemstoreTS(boolean enabled) {
@@ -1540,6 +1938,18 @@ public class HFileBlock extends SchemaCo
     void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
       this.dataBlockEncoder = encoder;
     }
+
+    /**
+     * Generates the checksum for the header as well as the data and
+     * then validates that it matches the value stored in the header.
+     * If there is a checksum mismatch, then return false. Otherwise
+     * return true.
+     */
+    protected boolean validateBlockChecksum(HFileBlock block, 
+      byte[] data, int hdrSize) throws IOException {
+      return ChecksumUtil.validateBlockChecksum(path, block,
+                                                data, hdrSize);
+    }
   }
 
   @Override
@@ -1618,5 +2028,87 @@ public class HFileBlock extends SchemaCo
     return DataBlockEncoding.NONE;
   }
 
+  byte getChecksumType() {
+    return this.checksumType;
+  }
+
+  int getBytesPerChecksum() {
+    return this.bytesPerChecksum;
+  }
+
+  int getOnDiskDataSizeWithHeader() {
+    return this.onDiskDataSizeWithHeader;
+  }
+
+  int getMinorVersion() {
+    return this.minorVersion;
+  }
+
+  /** 
+   * Calcuate the number of bytes required to store all the checksums
+   * for this block. Each checksum value is a 4 byte integer.
+   */
+  int totalChecksumBytes() {
+    // If the hfile block has minorVersion 0, then there are no checksum
+    // data to validate. Similarly, a zero value in this.bytesPerChecksum
+    // indicates that cached blocks do not have checksum data because
+    // checksums were already validated when the block was read from disk.
+    if (minorVersion < MINOR_VERSION_WITH_CHECKSUM || this.bytesPerChecksum == 0) {
+      return 0;
+    }
+    return (int)ChecksumUtil.numBytes(onDiskDataSizeWithHeader, bytesPerChecksum);
+  }
+
+  /**
+   * Returns the size of this block header.
+   */
+  public int headerSize() {
+    return headerSize(this.minorVersion);
+  }
+
+  /**
+   * Maps a minor version to the size of the header.
+   */
+  static private int headerSize(int minorVersion) {
+    if (minorVersion < MINOR_VERSION_WITH_CHECKSUM) {
+      return HEADER_SIZE_NO_CHECKSUM;
+    }
+    return HEADER_SIZE;
+  }
+
+  /**
+   * Convert the contents of the block header into a human readable string.
+   * This is mostly helpful for debugging. This assumes that the block
+   * has minor version > 0.
+   */
+  static String toStringHeader(ByteBuffer buf) throws IOException {
+    int offset = buf.arrayOffset();
+    byte[] b = buf.array();
+    long magic = Bytes.toLong(b, offset); 
+    BlockType bt = BlockType.read(buf);
+    offset += Bytes.SIZEOF_LONG;
+    int compressedBlockSizeNoHeader = Bytes.toInt(b, offset);
+    offset += Bytes.SIZEOF_INT;
+    int uncompressedBlockSizeNoHeader = Bytes.toInt(b, offset);
+    offset += Bytes.SIZEOF_INT;
+    long prevBlockOffset = Bytes.toLong(b, offset); 
+    offset += Bytes.SIZEOF_LONG;
+    byte cksumtype = b[offset];
+    offset += Bytes.SIZEOF_BYTE;
+    long bytesPerChecksum = Bytes.toInt(b, offset); 
+    offset += Bytes.SIZEOF_INT;
+    long onDiskDataSizeWithHeader = Bytes.toInt(b, offset); 
+    offset += Bytes.SIZEOF_INT;
+    return " Header dump: magic: " + magic +
+                   " blockType " + bt +
+                   " compressedBlockSizeNoHeader " + 
+                   compressedBlockSizeNoHeader +
+                   " uncompressedBlockSizeNoHeader " + 
+                   uncompressedBlockSizeNoHeader +
+                   " prevBlockOffset " + prevBlockOffset +
+                   " checksumType " + ChecksumType.codeToType(cksumtype) +
+                   " bytesPerChecksum " + bytesPerChecksum +
+                   " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
+  }
 }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java?rev=1298641&r1=1298640&r2=1298641&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java Thu Mar  8 22:55:49 2012
@@ -51,11 +51,12 @@ public interface HFileDataBlockEncoder {
    * Should be called before an encoded or unencoded data block is written to
    * disk.
    * @param in KeyValues next to each other
+   * @param dummyHeader A dummy header to be written as a placeholder
    * @return a non-null on-heap buffer containing the contents of the
    *         HFileBlock with unfilled header and block type
    */
   public Pair<ByteBuffer, BlockType> beforeWriteToDisk(
-      ByteBuffer in, boolean includesMemstoreTS);
+      ByteBuffer in, boolean includesMemstoreTS, byte[] dummyHeader);
 
   /**
    * Decides whether we should use a scanner over encoded blocks.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java?rev=1298641&r1=1298640&r2=1298641&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java Thu Mar  8 22:55:49 2012
@@ -154,14 +154,14 @@ public class HFileDataBlockEncoderImpl i
    */
   @Override
   public Pair<ByteBuffer, BlockType> beforeWriteToDisk(ByteBuffer in,
-      boolean includesMemstoreTS) {
+      boolean includesMemstoreTS, byte[] dummyHeader) {
     if (onDisk == DataBlockEncoding.NONE) {
       // there is no need to encode the block before writing it to disk
       return new Pair<ByteBuffer, BlockType>(in, BlockType.DATA);
     }
 
     ByteBuffer encodedBuffer = encodeBufferToHFileBlockBuffer(in,
-        onDisk, includesMemstoreTS);
+        onDisk, includesMemstoreTS, dummyHeader);
     return new Pair<ByteBuffer, BlockType>(encodedBuffer,
         BlockType.ENCODED_DATA);
   }
@@ -175,12 +175,13 @@ public class HFileDataBlockEncoderImpl i
   }
 
   private ByteBuffer encodeBufferToHFileBlockBuffer(ByteBuffer in,
-      DataBlockEncoding algo, boolean includesMemstoreTS) {
+      DataBlockEncoding algo, boolean includesMemstoreTS,
+      byte[] dummyHeader) {
     ByteArrayOutputStream encodedStream = new ByteArrayOutputStream();
     DataOutputStream dataOut = new DataOutputStream(encodedStream);
     DataBlockEncoder encoder = algo.getEncoder();
     try {
-      encodedStream.write(HFileBlock.DUMMY_HEADER);
+      encodedStream.write(dummyHeader);
       algo.writeIdInBytes(dataOut);
       encoder.compressKeyValues(dataOut, in,
           includesMemstoreTS);
@@ -194,13 +195,16 @@ public class HFileDataBlockEncoderImpl i
   private HFileBlock encodeDataBlock(HFileBlock block,
       DataBlockEncoding algo, boolean includesMemstoreTS) {
     ByteBuffer compressedBuffer = encodeBufferToHFileBlockBuffer(
-        block.getBufferWithoutHeader(), algo, includesMemstoreTS);
-    int sizeWithoutHeader = compressedBuffer.limit() - HFileBlock.HEADER_SIZE;
+        block.getBufferWithoutHeader(), algo, includesMemstoreTS,
+        HFileBlock.DUMMY_HEADER);
+    int sizeWithoutHeader = compressedBuffer.limit() - block.headerSize();
     HFileBlock encodedBlock = new HFileBlock(BlockType.ENCODED_DATA,
         block.getOnDiskSizeWithoutHeader(),
         sizeWithoutHeader, block.getPrevBlockOffset(),
         compressedBuffer, HFileBlock.FILL_HEADER, block.getOffset(),
-        includesMemstoreTS);
+        includesMemstoreTS, block.getMinorVersion(),
+        block.getBytesPerChecksum(), block.getChecksumType(),
+        block.getOnDiskDataSizeWithHeader());
     block.passSchemaMetricsTo(encodedBlock);
     return encodedBlock;
   }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java?rev=1298641&r1=1298640&r2=1298641&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java Thu Mar  8 22:55:49 2012
@@ -65,10 +65,10 @@ public class HFileReaderV1 extends Abstr
   public HFileReaderV1(Path path, FixedFileTrailer trailer,
       final FSDataInputStream fsdis, final long size,
       final boolean closeIStream,
-      final CacheConfig cacheConf) {
+      final CacheConfig cacheConf) throws IOException {
     super(path, trailer, fsdis, size, closeIStream, cacheConf);
 
-    trailer.expectVersion(1);
+    trailer.expectMajorVersion(1);
     fsBlockReader = new HFileBlock.FSReaderV1(fsdis, compressAlgo, fileSize);
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1298641&r1=1298640&r2=1298641&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java Thu Mar  8 22:55:49 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
@@ -73,6 +74,12 @@ public class HFileReaderV2 extends Abstr
    */
   private List<HFileBlock> loadOnOpenBlocks = new ArrayList<HFileBlock>();
 
+  /** Minimum minor version supported by this HFile format */
+  static final int MIN_MINOR_VERSION = 0;
+
+  /** Maximum minor version supported by this HFile format */
+  static final int MAX_MINOR_VERSION = 1;
+
   /**
    * Opens a HFile. You must load the index before you can use it by calling
    * {@link #loadFileInfo()}.
@@ -89,14 +96,18 @@ public class HFileReaderV2 extends Abstr
    *          still use its on-disk encoding in cache.
    */
   public HFileReaderV2(Path path, FixedFileTrailer trailer,
-      final FSDataInputStream fsdis, final long size,
+      final FSDataInputStream fsdis, final FSDataInputStream fsdisNoFsChecksum,
+      final long size,
       final boolean closeIStream, final CacheConfig cacheConf,
-      DataBlockEncoding preferredEncodingInCache)
+      DataBlockEncoding preferredEncodingInCache, final HFileSystem hfs)
       throws IOException {
-    super(path, trailer, fsdis, size, closeIStream, cacheConf);
-    trailer.expectVersion(2);
+    super(path, trailer, fsdis, fsdisNoFsChecksum, size, 
+          closeIStream, cacheConf, hfs);
+    trailer.expectMajorVersion(2);
+    validateMinorVersion(path, trailer.getMinorVersion());
     HFileBlock.FSReaderV2 fsBlockReaderV2 = new HFileBlock.FSReaderV2(fsdis,
-        compressAlgo, fileSize);
+        fsdisNoFsChecksum,
+        compressAlgo, fileSize, trailer.getMinorVersion(), hfs, path);
     this.fsBlockReader = fsBlockReaderV2; // upcast
 
     // Comparator class name is stored in the trailer in version 2.
@@ -411,9 +422,15 @@ public class HFileReaderV2 extends Abstr
           + " block(s)");
       }
     }
-    if (closeIStream && istream != null) {
-      istream.close();
-      istream = null;
+    if (closeIStream) {
+      if (istream != istreamNoFsChecksum && istreamNoFsChecksum != null) {
+        istreamNoFsChecksum.close();
+        istreamNoFsChecksum = null;
+      }
+      if (istream != null) {
+        istream.close();
+        istream = null;
+      }
     }
   }
 
@@ -915,9 +932,9 @@ public class HFileReaderV2 extends Abstr
     private ByteBuffer getEncodedBuffer(HFileBlock newBlock) {
       ByteBuffer origBlock = newBlock.getBufferReadOnly();
       ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(),
-          origBlock.arrayOffset() + HFileBlock.HEADER_SIZE +
+          origBlock.arrayOffset() + newBlock.headerSize() +
           DataBlockEncoding.ID_SIZE,
-          origBlock.limit() - HFileBlock.HEADER_SIZE -
+          newBlock.getUncompressedSizeWithoutHeader() -
           DataBlockEncoding.ID_SIZE).slice();
       return encodedBlock;
     }
@@ -1053,4 +1070,19 @@ public class HFileReaderV2 extends Abstr
     return true; // We load file info in constructor in version 2.
   }
 
+  /**
+   * Validates that the minor version is within acceptable limits.
+   * Otherwise throws an Runtime exception
+   */
+  private void validateMinorVersion(Path path, int minorVersion) {
+    if (minorVersion < MIN_MINOR_VERSION ||
+        minorVersion > MAX_MINOR_VERSION) {
+      String msg = "Minor version for path " + path + 
+                   " is expected to be between " +
+                   MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION +
+                   " but is found to be " + minorVersion;
+      LOG.error(msg);
+      throw new RuntimeException(msg);
+    }
+  }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java?rev=1298641&r1=1298640&r2=1298641&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java Thu Mar  8 22:55:49 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
 import org.apache.hadoop.hbase.regionserver.MemStore;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
+import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
@@ -92,8 +93,9 @@ public class HFileWriterV1 extends Abstr
     public Writer createWriter(FileSystem fs, Path path,
         FSDataOutputStream ostream, int blockSize,
         Algorithm compressAlgo, HFileDataBlockEncoder dataBlockEncoder,
-        KeyComparator comparator)
-        throws IOException {
+        KeyComparator comparator, final ChecksumType checksumType,
+        final int bytesPerChecksum) throws IOException {
+      // version 1 does not implement checksums
       return new HFileWriterV1(conf, cacheConf, fs, path, ostream, blockSize,
           compressAlgo, dataBlockEncoder, comparator);
     }
@@ -149,7 +151,13 @@ public class HFileWriterV1 extends Abstr
       HFileBlock block = new HFileBlock(BlockType.DATA,
           (int) (outputStream.getPos() - blockBegin), bytes.length, -1,
           ByteBuffer.wrap(bytes, 0, bytes.length), HFileBlock.FILL_HEADER,
-          blockBegin, MemStore.NO_PERSISTENT_TS);
+          blockBegin, MemStore.NO_PERSISTENT_TS, 
+          HFileBlock.MINOR_VERSION_NO_CHECKSUM,        // minor version
+          0,                                         // bytesPerChecksum
+          ChecksumType.NULL.getCode(),               // checksum type
+          (int) (outputStream.getPos() - blockBegin) +
+          HFileBlock.HEADER_SIZE_NO_CHECKSUM);       // onDiskDataSizeWithHeader
+
       block = blockEncoder.diskToCacheFormat(block, false);
       passSchemaMetricsTo(block);
       cacheConf.getBlockCache().cacheBlock(
@@ -174,7 +182,7 @@ public class HFileWriterV1 extends Abstr
     if (cacheConf.shouldCacheDataOnWrite()) {
       this.baos = new ByteArrayOutputStream();
       this.baosDos = new DataOutputStream(baos);
-      baosDos.write(HFileBlock.DUMMY_HEADER);
+      baosDos.write(HFileBlock.DUMMY_HEADER_NO_CHECKSUM);
     }
   }
 
@@ -332,7 +340,8 @@ public class HFileWriterV1 extends Abstr
 
     finishBlock();
 
-    FixedFileTrailer trailer = new FixedFileTrailer(1);
+    FixedFileTrailer trailer = new FixedFileTrailer(1,
+                                 HFileBlock.MINOR_VERSION_NO_CHECKSUM);
 
     // Write out the metadata blocks if any.
     ArrayList<Long> metaOffsets = null;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java?rev=1298641&r1=1298640&r2=1298641&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java Thu Mar  8 22:55:49 2012
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.KeyValue.
 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
+import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
@@ -81,6 +82,10 @@ public class HFileWriterV2 extends Abstr
   private List<BlockWritable> additionalLoadOnOpenData =
     new ArrayList<BlockWritable>();
 
+  /** Checksum related settings */
+  private ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE;
+  private int bytesPerChecksum = HFile.DEFAULT_BYTES_PER_CHECKSUM;
+
   private final boolean includeMemstoreTS = true;
   private long maxMemstoreTS = 0;
 
@@ -93,9 +98,10 @@ public class HFileWriterV2 extends Abstr
     public Writer createWriter(FileSystem fs, Path path,
         FSDataOutputStream ostream, int blockSize,
         Compression.Algorithm compress, HFileDataBlockEncoder blockEncoder,
-        final KeyComparator comparator) throws IOException {
+        final KeyComparator comparator, final ChecksumType checksumType,
+        final int bytesPerChecksum) throws IOException {
       return new HFileWriterV2(conf, cacheConf, fs, path, ostream, blockSize,
-          compress, blockEncoder, comparator);
+          compress, blockEncoder, comparator, checksumType, bytesPerChecksum);
     }
   }
 
@@ -103,11 +109,14 @@ public class HFileWriterV2 extends Abstr
   public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
       FileSystem fs, Path path, FSDataOutputStream ostream, int blockSize,
       Compression.Algorithm compressAlgo, HFileDataBlockEncoder blockEncoder,
-      final KeyComparator comparator) throws IOException {
+      final KeyComparator comparator, final ChecksumType checksumType,
+      final int bytesPerChecksum) throws IOException {
     super(cacheConf,
         ostream == null ? createOutputStream(conf, fs, path) : ostream,
         path, blockSize, compressAlgo, blockEncoder, comparator);
     SchemaMetrics.configureGlobally(conf);
+    this.checksumType = checksumType;
+    this.bytesPerChecksum = bytesPerChecksum;
     finishInit(conf);
   }
 
@@ -118,7 +127,7 @@ public class HFileWriterV2 extends Abstr
 
     // HFile filesystem-level (non-caching) block writer
     fsBlockWriter = new HFileBlock.Writer(compressAlgo, blockEncoder,
-        includeMemstoreTS);
+        includeMemstoreTS, checksumType, bytesPerChecksum);
 
     // Data block index writer
     boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
@@ -356,7 +365,8 @@ public class HFileWriterV2 extends Abstr
     finishBlock();
     writeInlineBlocks(true);
 
-    FixedFileTrailer trailer = new FixedFileTrailer(2);
+    FixedFileTrailer trailer = new FixedFileTrailer(2, 
+                                 HFileReaderV2.MAX_MINOR_VERSION);
 
     // Write out the metadata blocks if any.
     if (!metaNames.isEmpty()) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java?rev=1298641&r1=1298640&r2=1298641&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java Thu Mar  8 22:55:49 2012
@@ -46,7 +46,7 @@ public class NoOpDataBlockEncoder implem
 
   @Override
   public Pair<ByteBuffer, BlockType> beforeWriteToDisk(
-      ByteBuffer in, boolean includesMemstoreTS) {
+      ByteBuffer in, boolean includesMemstoreTS, byte[] dummyHeader) {
     return new Pair<ByteBuffer, BlockType>(in, BlockType.DATA);
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1298641&r1=1298640&r2=1298641&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Thu Mar  8 22:55:49 2012
@@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -553,6 +554,8 @@ public class LoadIncrementalHFiles exten
               .withCompression(compression)
               .withDataBlockEncoder(dataBlockEncoder)
               .withBloomType(bloomFilterType)
+              .withChecksumType(Store.getChecksumType(conf))
+              .withBytesPerChecksum(Store.getBytesPerChecksum(conf))
               .build();
       HFileScanner scanner = halfReader.getScanner(false, false, false);
       scanner.seekTo();

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1298641&r1=1298640&r2=1298641&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Mar  8 22:55:49 2012
@@ -3624,7 +3624,14 @@ public class HRegion implements HeapSize
     }
     Path dir = HTableDescriptor.getTableDir(FSUtils.getRootDir(conf),
       info.getTableName());
-    HRegion r = HRegion.newHRegion(dir, wal, FileSystem.get(conf), conf, info,
+    FileSystem fs = null;
+    if (rsServices != null) {
+      fs = rsServices.getFileSystem();
+    }
+    if (fs == null) {
+      fs = FileSystem.get(conf);
+    }
+    HRegion r = HRegion.newHRegion(dir, wal, fs, conf, info,
       htd, rsServices);
     return r.openHRegion(reporter);
   }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1298641&r1=1298640&r2=1298641&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Mar  8 22:55:49 2012
@@ -106,6 +106,7 @@ import org.apache.hadoop.hbase.executor.
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
+import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -195,7 +196,8 @@ public class HRegionServer implements HR
   protected final Configuration conf;
 
   protected final AtomicBoolean haveRootRegion = new AtomicBoolean(false);
-  private FileSystem fs;
+  private HFileSystem fs;
+  private boolean useHBaseChecksum; // verify hbase checksums?
   private Path rootDir;
   private final Random rand = new Random();
 
@@ -368,6 +370,11 @@ public class HRegionServer implements HR
     this.isOnline = false;
     checkCodecs(this.conf);
 
+    // do we use checksum verfication in the hbase? If hbase checksum verification
+    // is enabled, then we automatically switch off hdfs checksum verification.
+    this.useHBaseChecksum = conf.getBoolean(
+      HConstants.HBASE_CHECKSUM_VERIFICATION, true);
+
     // Config'ed params
     this.numRetries = conf.getInt("hbase.client.retries.number", 10);
     this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
@@ -978,7 +985,7 @@ public class HRegionServer implements HR
       // to defaults).
       this.conf.set("fs.defaultFS", this.conf.get("hbase.rootdir"));
       // Get fs instance used by this RS
-      this.fs = FileSystem.get(this.conf);
+      this.fs = new HFileSystem(this.conf, this.useHBaseChecksum);
       this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
       this.tableDescriptors = new FSTableDescriptors(this.fs, this.rootDir, true);
       this.hlog = setupWALAndReplication();
@@ -1278,7 +1285,7 @@ public class HRegionServer implements HR
    * @throws IOException
    */
   protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
-    return new HLog(this.fs, logdir, oldLogDir, this.conf,
+    return new HLog(this.fs.getBackingFs(), logdir, oldLogDir, this.conf,
       getWALActionListeners(), this.serverNameFromMasterPOV.toString());
   }
 
@@ -3165,7 +3172,7 @@ public class HRegionServer implements HR
   /**
    * @return Return the fs.
    */
-  protected FileSystem getFileSystem() {
+  public FileSystem getFileSystem() {
     return fs;
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java?rev=1298641&r1=1298640&r2=1298641&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java Thu Mar  8 22:55:49 2012
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -80,5 +81,9 @@ public interface RegionServerServices ex
    * @return map of regions in transition in this RS
    */
   public Map<byte[], Boolean> getRegionsInTransitionInRS();
-  
+
+  /**
+   * @return Return the FileSystem object used by the regionserver
+   */
+  public FileSystem getFileSystem();
 }



Mime
View raw message