hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1298641 [1/4] - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/fs/ main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/mapreduce/ main/java/org/apache/hadoop/hbase/region...
Date Thu, 08 Mar 2012 22:55:50 GMT
Author: mbautin
Date: Thu Mar  8 22:55:49 2012
New Revision: 1298641

URL: http://svn.apache.org/viewvc?rev=1298641&view=rev
Log:
[jira] [HBASE-5074] Support checksums in HBase block cache

Author: Dhruba

Summary:
HFile is enhanced to store a checksum for each block. HDFS checksum verification
is avoided while reading data into the block cache. On a checksum verification
failure, we retry the file system read request with hdfs checksums switched on
(thanks Todd).

I have a benchmark that shows that it reduces iops on the disk by about 40%. In
this experiment, the entire memory on the regionserver is allocated to the
regionserver's jvm and the OS buffer cache size is negligible. I also measured
negligible (<5%) additional cpu usage while using hbase-level checksums.

The salient points of this patch:

1. Each hfile's trailer used to have a 4 byte version number. I enhanced this so
that these 4 bytes can be interpreted as a (major version number, minor
version). Pre-existing hfiles have a minor version of 0. The new hfile format
has a minor version of 1 (thanks Mikhail). The hfile major version remains
unchanged at 2. The reason I did not introduce a new major version number is
because the code changes needed to store/read checksums do not differ much from
existing V2 writers/readers.

2. Introduced a HFileSystem object which is a encapsulates the FileSystem
objects needed to access data from hfiles and hlogs.  HDFS FileSystem objects
already had the ability to switch off checksum verifications for reads.

3. The majority of the code changes are located in hbase.io.hfie package. The
retry of a read on an initial checksum failure occurs inside the hbase.io.hfile
package itself.  The code changes to hbase.regionserver package are minor.

4. The format of a hfileblock is the header followed by the data followed by the
checksum(s). Each 16 K (configurable) size of data has a 4 byte checksum.  The
hfileblock header has two additional fields: a 4 byte value to store the
bytesPerChecksum and a 4 byte value to store the size of the user data
(excluding the checksum data). This is well explained in the associated
javadocs.

5. I added a test to test backward compatibility. I will be writing more unit
tests that triggers checksum verification failures aggressively. I have left a
few redundant log messages in the code (just for easier debugging) and will
remove them in later stage of this patch. I will also be adding metrics on
number of checksum verification failures/success in a later version of this
diff.

6. By default, hbase-level checksums are switched on and hdfs level checksums
are switched off for hfile-reads. No changes to Hlog code path here.

Test Plan: The default setting is to switch on hbase checksums for hfile-reads,
thus all existing tests actually validate the new code pieces. I will be writing
more unit tests for triggering checksum verification failures.

Reviewers: mbautin

Reviewed By: mbautin

CC: JIRA, tedyu, mbautin, dhruba, todd, stack

Differential Revision: https://reviews.facebook.net/D1521

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/fs/
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ChecksumFactory.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderV1.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1298641&r1=1298640&r2=1298641&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java Thu Mar  8 22:55:49
2012
@@ -304,7 +304,7 @@ public final class HConstants {
   
   /** The regioninfo column qualifier */
   public static final byte [] REGIONINFO_QUALIFIER = 
-	  Bytes.toBytes(REGIONINFO_QUALIFIER_STR);
+    Bytes.toBytes(REGIONINFO_QUALIFIER_STR);
   
   /** The server column qualifier */
   public static final byte [] SERVER_QUALIFIER = Bytes.toBytes("server");
@@ -610,6 +610,35 @@ public final class HConstants {
   /** Host name of the local machine */
   public static final String LOCALHOST = "localhost";
 
+  /** 
+   * If this parameter is set to true, then hbase will read
+   * data and then verify checksums. Checksum verification 
+   * inside hdfs will be switched off.  However, if the hbase-checksum 
+   * verification fails, then it will switch back to using
+   * hdfs checksums for verifiying data that is being read from storage.
+   *
+   * If this parameter is set to false, then hbase will not
+   * verify any checksums, instead it will depend on checksum verification
+   * being done in the hdfs client.
+   */
+  public static final String HBASE_CHECKSUM_VERIFICATION = 
+      "hbase.regionserver.checksum.verify";
+
+  /**
+   * The name of the configuration parameter that specifies
+   * the number of bytes in a newly created checksum chunk.
+   */
+  public static final String BYTES_PER_CHECKSUM =
+      "hbase.hstore.bytes.per.checksum";
+
+  /**
+   * The name of the configuration parameter that specifies
+   * the name of an algorithm that is used to compute checksums
+   * for newly created blocks.
+   */
+  public static final String CHECKSUM_TYPE_NAME =
+      "hbase.hstore.checksum.algorithm";
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java?rev=1298641&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java Thu Mar  8 22:55:49
2012
@@ -0,0 +1,177 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.fs;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * An encapsulation for the FileSystem object that hbase uses to access
+ * data. This class allows the flexibility of using  
+ * separate filesystem objects for reading and writing hfiles and hlogs.
+ * In future, if we want to make hlogs be in a different filesystem,
+ * this is the place to make it happen.
+ */
+public class HFileSystem extends FilterFileSystem {
+
+  private final FileSystem noChecksumFs;   // read hfile data from storage
+  private final boolean useHBaseChecksum;
+
+  /**
+   * Create a FileSystem object for HBase regionservers.
+   * @param conf The configuration to be used for the filesystem
+   * @param useHBaseChecksums if true, then use
+   *        checksum verfication in hbase, otherwise
+   *        delegate checksum verification to the FileSystem.
+   */
+  public HFileSystem(Configuration conf, boolean useHBaseChecksum)
+    throws IOException {
+
+    // Create the default filesystem with checksum verification switched on.
+    // By default, any operation to this FilterFileSystem occurs on
+    // the underlying filesystem that has checksums switched on.
+    this.fs = FileSystem.get(conf);
+    this.useHBaseChecksum = useHBaseChecksum;
+    
+    fs.initialize(getDefaultUri(conf), conf);
+
+    // If hbase checksum verification is switched on, then create a new
+    // filesystem object that has cksum verification turned off.
+    // We will avoid verifying checksums in the fs client, instead do it
+    // inside of hbase.
+    if (useHBaseChecksum) {
+      this.noChecksumFs = newInstanceFileSystem(conf);
+      this.noChecksumFs.setVerifyChecksum(false);
+    } else {
+      this.noChecksumFs = fs;
+    }
+  }
+
+  /**
+   * Wrap a FileSystem object within a HFileSystem. The noChecksumFs and
+   * writefs are both set to be the same specified fs. 
+   * Do not verify hbase-checksums while reading data from filesystem.
+   * @param fs Set the noChecksumFs and writeFs to this specified filesystem.
+   */
+  public HFileSystem(FileSystem fs) {
+    this.fs = fs;
+    this.noChecksumFs = fs;
+    this.useHBaseChecksum = false;
+  }
+
+  /**
+   * Returns the filesystem that is specially setup for 
+   * doing reads from storage. This object avoids doing 
+   * checksum verifications for reads.
+   * @return The FileSystem object that can be used to read data
+   *         from files.
+   */
+  public FileSystem getNoChecksumFs() {
+    return noChecksumFs;
+  }
+
+  /**
+   * Returns the underlying filesystem
+   * @return The underlying FileSystem for this FilterFileSystem object.
+   */
+  public FileSystem getBackingFs() throws IOException {
+    return fs;
+  }
+
+  /**
+   * Are we verifying checksums in HBase?
+   * @return True, if hbase is configured to verify checksums,
+   *         otherwise false.
+   */
+  public boolean useHBaseChecksum() {
+    return useHBaseChecksum;
+  }
+
+  /**
+   * Close this filesystem object
+   */
+  @Override
+  public void close() throws IOException {
+    super.close();
+    if (this.noChecksumFs != fs) {
+      this.noChecksumFs.close();
+    }
+  }
+
+ /**
+   * Returns a brand new instance of the FileSystem. It does not use
+   * the FileSystem.Cache. In newer versions of HDFS, we can directly
+   * invoke FileSystem.newInstance(Configuration).
+   * 
+   * @param conf Configuration
+   * @return A new instance of the filesystem
+   */
+  private static FileSystem newInstanceFileSystem(Configuration conf)
+    throws IOException {
+    URI uri = FileSystem.getDefaultUri(conf);
+    Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
+    if (clazz == null) {
+      throw new IOException("No FileSystem for scheme: " + uri.getScheme());
+    }
+    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
+    fs.initialize(uri, conf);
+    return fs;
+  }
+
+  /**
+   * Create a new HFileSystem object, similar to FileSystem.get().
+   * This returns a filesystem object that avoids checksum
+   * verification in the filesystem for hfileblock-reads.
+   * For these blocks, checksum verification is done by HBase.
+   */
+  static public FileSystem get(Configuration conf) throws IOException {
+    return new HFileSystem(conf, true);
+  }
+
+  /**
+   * Wrap a LocalFileSystem within a HFileSystem.
+   */
+  static public FileSystem getLocalFs(Configuration conf) throws IOException {
+    return new HFileSystem(FileSystem.getLocal(conf));
+  }
+
+  /**
+   * The org.apache.hadoop.fs.FilterFileSystem does not yet support 
+   * createNonRecursive. This is a hadoop bug and when it is fixed in Hadoop,
+   * this definition will go away.
+   */
+  public FSDataOutputStream createNonRecursive(Path f,
+      boolean overwrite,
+      int bufferSize, short replication, long blockSize,
+      Progressable progress) throws IOException {
+    return fs.createNonRecursive(f, overwrite, bufferSize, replication,
+                                 blockSize, progress);
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java?rev=1298641&r1=1298640&r2=1298641&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java Thu
Mar  8 22:55:49 2012
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
@@ -42,9 +43,13 @@ public abstract class AbstractHFileReade
   /** Filesystem-level block reader for this HFile format version. */
   protected HFileBlock.FSReader fsBlockReader;
 
-  /** Stream to read from. */
+  /** Stream to read from. Does checksum verifications in file system */
   protected FSDataInputStream istream;
 
+  /** The file system stream of the underlying {@link HFile} that
+   * does not do checksum verification in the file system */
+  protected FSDataInputStream istreamNoFsChecksum;
+
   /**
    * True if we should close the input stream when done. We don't close it if we
    * didn't open it.
@@ -99,10 +104,21 @@ public abstract class AbstractHFileReade
 
   protected FileInfo fileInfo;
 
+  /** The filesystem used for accesing data */
+  protected HFileSystem hfs;
+
   protected AbstractHFileReader(Path path, FixedFileTrailer trailer,
       final FSDataInputStream fsdis, final long fileSize,
       final boolean closeIStream,
       final CacheConfig cacheConf) {
+    this(path, trailer, fsdis, fsdis, fileSize, closeIStream, cacheConf, null);
+  }
+
+  protected AbstractHFileReader(Path path, FixedFileTrailer trailer,
+      final FSDataInputStream fsdis, final FSDataInputStream fsdisNoFsChecksum,
+      final long fileSize,
+      final boolean closeIStream,
+      final CacheConfig cacheConf, final HFileSystem hfs) {
     super(null, path);
     this.trailer = trailer;
     this.compressAlgo = trailer.getCompressionCodec();
@@ -112,6 +128,8 @@ public abstract class AbstractHFileReade
     this.closeIStream = closeIStream;
     this.path = path;
     this.name = path.getName();
+    this.hfs = hfs;
+    this.istreamNoFsChecksum = fsdisNoFsChecksum;
   }
 
   @SuppressWarnings("serial")
@@ -343,5 +361,4 @@ public abstract class AbstractHFileReade
   public DataBlockEncoding getEncodingOnDisk() {
     return dataBlockEncoder.getEncodingOnDisk();
   }
-
 }

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java?rev=1298641&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java Thu Mar 
8 22:55:49 2012
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.zip.Checksum;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ChecksumFactory;
+import org.apache.hadoop.hbase.util.ChecksumType;
+
+/**
+ * Utility methods to compute and validate checksums.
+ */
+public class ChecksumUtil {
+
+  /** This is used to reserve space in a byte buffer */
+  private static byte[] DUMMY_VALUE = new byte[128 * HFileBlock.CHECKSUM_SIZE];
+
+  /** 
+   * This is used by unit tests to make checksum failures throw an 
+   * exception instead of returning null. Returning a null value from 
+   * checksum validation will cause the higher layer to retry that 
+   * read with hdfs-level checksums. Instead, we would like checksum 
+   * failures to cause the entire unit test to fail.
+   */
+  private static boolean generateExceptions = false;
+
+  /**
+   * Generates a checksum for all the data in indata. The checksum is
+   * written to outdata.
+   * @param indata input data stream
+   * @param startOffset starting offset in the indata stream from where to
+   *                    compute checkums from
+   * @param endOffset ending offset in the indata stream upto
+   *                   which checksums needs to be computed
+   * @param outData the output buffer where checksum values are written
+   * @param outOffset the starting offset in the outdata where the
+   *                  checksum values are written
+   * @param checksumType type of checksum
+   * @param bytesPerChecksum number of bytes per checksum value
+   */
+  static void generateChecksums(byte[] indata,
+    int startOffset, int endOffset, 
+    byte[] outdata, int outOffset,
+    ChecksumType checksumType,
+    int bytesPerChecksum) throws IOException {
+
+    if (checksumType == ChecksumType.NULL) {
+      return; // No checkums for this block.
+    }
+
+    Checksum checksum = checksumType.getChecksumObject();
+    int bytesLeft = endOffset - startOffset;
+    int chunkNum = 0;
+
+    while (bytesLeft > 0) {
+      // generate the checksum for one chunk
+      checksum.reset();
+      int count = Math.min(bytesLeft, bytesPerChecksum);
+      checksum.update(indata, startOffset, count);
+
+      // write the checksum value to the output buffer.
+      int cksumValue = (int)checksum.getValue();
+      outOffset = Bytes.putInt(outdata, outOffset, cksumValue);
+      chunkNum++;
+      startOffset += count;
+      bytesLeft -= count;
+    }
+  }
+
+  /**
+   * Validates that the data in the specified HFileBlock matches the
+   * checksum.  Generates the checksum for the data and
+   * then validate that it matches the value stored in the header.
+   * If there is a checksum mismatch, then return false. Otherwise
+   * return true.
+   * The header is extracted from the specified HFileBlock while the
+   * data-to-be-verified is extracted from 'data'.
+   */
+  static boolean validateBlockChecksum(Path path, HFileBlock block, 
+    byte[] data, int hdrSize) throws IOException {
+
+    // If this is an older version of the block that does not have
+    // checksums, then return false indicating that checksum verification
+    // did not succeed. Actually, this methiod should never be called
+    // when the minorVersion is 0, thus this is a defensive check for a
+    // cannot-happen case. Since this is a cannot-happen case, it is
+    // better to return false to indicate a checksum validation failure.
+    if (block.getMinorVersion() < HFileBlock.MINOR_VERSION_WITH_CHECKSUM) {
+      return false;
+    }
+
+    // Get a checksum object based on the type of checksum that is
+    // set in the HFileBlock header. A ChecksumType.NULL indicates that 
+    // the caller is not interested in validating checksums, so we
+    // always return true.
+    ChecksumType cktype = ChecksumType.codeToType(block.getChecksumType());
+    if (cktype == ChecksumType.NULL) {
+      return true; // No checkums validations needed for this block.
+    }
+    Checksum checksumObject = cktype.getChecksumObject();
+    checksumObject.reset();
+
+    // read in the stored value of the checksum size from the header.
+    int bytesPerChecksum = block.getBytesPerChecksum();
+
+    // bytesPerChecksum is always larger than the size of the header
+    if (bytesPerChecksum < hdrSize) {
+      String msg = "Unsupported value of bytesPerChecksum. " +
+                   " Minimum is " + hdrSize + 
+                   " but the configured value is " + bytesPerChecksum;
+      HFile.LOG.warn(msg);
+      return false;   // cannot happen case, unable to verify checksum
+    }
+    // Extract the header and compute checksum for the header.
+    ByteBuffer hdr = block.getBufferWithHeader();
+    checksumObject.update(hdr.array(), hdr.arrayOffset(), hdrSize);
+
+    int off = hdrSize;
+    int consumed = hdrSize;
+    int bytesLeft = block.getOnDiskDataSizeWithHeader() - off;
+    int cksumOffset = block.getOnDiskDataSizeWithHeader();
+    
+    // validate each chunk
+    while (bytesLeft > 0) {
+      int thisChunkSize = bytesPerChecksum - consumed;
+      int count = Math.min(bytesLeft, thisChunkSize);
+      checksumObject.update(data, off, count);
+
+      int storedChecksum = Bytes.toInt(data, cksumOffset);
+      if (storedChecksum != (int)checksumObject.getValue()) {
+        String msg = "File " + path +
+                     " Stored checksum value of " + storedChecksum +
+                     " at offset " + cksumOffset +
+                     " does not match computed checksum " +
+                     checksumObject.getValue() +
+                     ", total data size " + data.length +
+                     " Checksum data range offset " + off + " len " + count +
+                     HFileBlock.toStringHeader(block.getBufferReadOnly());
+        HFile.LOG.warn(msg);
+        if (generateExceptions) {
+          throw new IOException(msg); // this is only for unit tests
+        } else {
+          return false;               // checksum validation failure
+        }
+      }
+      cksumOffset += HFileBlock.CHECKSUM_SIZE;
+      bytesLeft -= count; 
+      off += count;
+      consumed = 0;
+      checksumObject.reset();
+    }
+    return true; // checksum is valid
+  }
+
+  /**
+   * Returns the number of bytes needed to store the checksums for
+   * a specified data size
+   * @param datasize number of bytes of data
+   * @param bytesPerChecksum number of bytes in a checksum chunk
+   * @return The number of bytes needed to store the checksum values
+   */
+  static long numBytes(long datasize, int bytesPerChecksum) {
+    return numChunks(datasize, bytesPerChecksum) * 
+                     HFileBlock.CHECKSUM_SIZE;
+  }
+
+  /**
+   * Returns the number of checksum chunks needed to store the checksums for
+   * a specified data size
+   * @param datasize number of bytes of data
+   * @param bytesPerChecksum number of bytes in a checksum chunk
+   * @return The number of checksum chunks
+   */
+  static long numChunks(long datasize, int bytesPerChecksum) {
+    long numChunks = datasize/bytesPerChecksum;
+    if (datasize % bytesPerChecksum != 0) {
+      numChunks++;
+    }
+    return numChunks;
+  }
+
+  /**
+   * Write dummy checksums to the end of the specified bytes array
+   * to reserve space for writing checksums later
+   * @param baos OutputStream to write dummy checkum values
+   * @param numBytes Number of bytes of data for which dummy checksums
+   *                 need to be generated
+   * @param bytesPerChecksum Number of bytes per checksum value
+   */
+  static void reserveSpaceForChecksums(ByteArrayOutputStream baos,
+    int numBytes, int bytesPerChecksum) throws IOException {
+    long numChunks = numChunks(numBytes, bytesPerChecksum);
+    long bytesLeft = numChunks * HFileBlock.CHECKSUM_SIZE;
+    while (bytesLeft > 0) {
+      long count = Math.min(bytesLeft, DUMMY_VALUE.length);
+      baos.write(DUMMY_VALUE, 0, (int)count);
+      bytesLeft -= count;
+    }
+  }
+
+  /**
+   * Mechanism to throw an exception in case of hbase checksum
+   * failure. This is used by unit tests only.
+   * @param value Setting this to true will cause hbase checksum
+   *              verification failures to generate exceptions.
+   */
+  public static void generateExceptionForChecksumFailureForTest(boolean value) {
+    generateExceptions = value;
+  }
+}
+

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java?rev=1298641&r1=1298640&r2=1298641&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java Thu Mar
 8 22:55:49 2012
@@ -44,6 +44,13 @@ import com.google.common.io.NullOutputSt
  * variable parts of the file. Also includes basic metadata on this file. The
  * trailer size is fixed within a given {@link HFile} format version only, but
  * we always store the version number as the last four-byte integer of the file.
+ * The version number itself is split into two portions, a major 
+ * version and a minor version. 
+ * The last three bytes of a file is the major
+ * version and a single preceding byte is the minor number. The major version
+ * determines which readers/writers to use to read/write a hfile while a minor
+ * version determines smaller changes in hfile format that do not need a new
+ * reader/writer type.
  */
 @InterfaceAudience.Private
 public class FixedFileTrailer {
@@ -108,12 +115,16 @@ public class FixedFileTrailer {
   /** Raw key comparator class name in version 2 */
   private String comparatorClassName = RawComparator.class.getName();
 
-  /** The {@link HFile} format version. */
-  private final int version;
+  /** The {@link HFile} format major version. */
+  private final int majorVersion;
 
-  FixedFileTrailer(int version) {
-    this.version = version;
-    HFile.checkFormatVersion(version);
+  /** The {@link HFile} format minor version. */
+  private final int minorVersion;
+
+  FixedFileTrailer(int majorVersion, int minorVersion) {
+    this.majorVersion = majorVersion;
+    this.minorVersion = minorVersion;
+    HFile.checkFormatVersion(majorVersion);
   }
 
   private static int[] computeTrailerSizeByVersion() {
@@ -121,7 +132,8 @@ public class FixedFileTrailer {
     for (int version = MIN_FORMAT_VERSION;
          version <= MAX_FORMAT_VERSION;
          ++version) {
-      FixedFileTrailer fft = new FixedFileTrailer(version);
+      FixedFileTrailer fft = new FixedFileTrailer(version, 
+                                   HFileBlock.MINOR_VERSION_NO_CHECKSUM);
       DataOutputStream dos = new DataOutputStream(new NullOutputStream());
       try {
         fft.serialize(dos);
@@ -151,7 +163,7 @@ public class FixedFileTrailer {
   }
 
   public int getTrailerSize() {
-    return getTrailerSize(version);
+    return getTrailerSize(majorVersion);
   }
 
   /**
@@ -163,7 +175,7 @@ public class FixedFileTrailer {
    * @throws IOException
    */
   void serialize(DataOutputStream outputStream) throws IOException {
-    HFile.checkFormatVersion(version);
+    HFile.checkFormatVersion(majorVersion);
 
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     DataOutput baosDos = new DataOutputStream(baos);
@@ -173,7 +185,7 @@ public class FixedFileTrailer {
     baosDos.writeLong(loadOnOpenDataOffset);
     baosDos.writeInt(dataIndexCount);
 
-    if (version == 1) {
+    if (majorVersion == 1) {
       // This used to be metaIndexOffset, but it was not used in version 1.
       baosDos.writeLong(0);
     } else {
@@ -182,7 +194,7 @@ public class FixedFileTrailer {
 
     baosDos.writeInt(metaIndexCount);
     baosDos.writeLong(totalUncompressedBytes);
-    if (version == 1) {
+    if (majorVersion == 1) {
       baosDos.writeInt((int) Math.min(Integer.MAX_VALUE, entryCount));
     } else {
       // This field is long from version 2 onwards.
@@ -190,14 +202,16 @@ public class FixedFileTrailer {
     }
     baosDos.writeInt(compressionCodec.ordinal());
 
-    if (version > 1) {
+    if (majorVersion > 1) {
       baosDos.writeInt(numDataIndexLevels);
       baosDos.writeLong(firstDataBlockOffset);
       baosDos.writeLong(lastDataBlockOffset);
       Bytes.writeStringFixedSize(baosDos, comparatorClassName,
           MAX_COMPARATOR_NAME_LENGTH);
     }
-    baosDos.writeInt(version);
+
+    // serialize the major and minor versions
+    baosDos.writeInt(materializeVersion(majorVersion, minorVersion));
 
     outputStream.write(baos.toByteArray());
   }
@@ -212,7 +226,7 @@ public class FixedFileTrailer {
    * @throws IOException
    */
   void deserialize(DataInputStream inputStream) throws IOException {
-    HFile.checkFormatVersion(version);
+    HFile.checkFormatVersion(majorVersion);
 
     BlockType.TRAILER.readAndCheck(inputStream);
 
@@ -220,7 +234,7 @@ public class FixedFileTrailer {
     loadOnOpenDataOffset = inputStream.readLong();
     dataIndexCount = inputStream.readInt();
 
-    if (version == 1) {
+    if (majorVersion == 1) {
       inputStream.readLong(); // Read and skip metaIndexOffset.
     } else {
       uncompressedDataIndexSize = inputStream.readLong();
@@ -228,9 +242,9 @@ public class FixedFileTrailer {
     metaIndexCount = inputStream.readInt();
 
     totalUncompressedBytes = inputStream.readLong();
-    entryCount = version == 1 ? inputStream.readInt() : inputStream.readLong();
+    entryCount = majorVersion == 1 ? inputStream.readInt() : inputStream.readLong();
     compressionCodec = Compression.Algorithm.values()[inputStream.readInt()];
-    if (version > 1) {
+    if (majorVersion > 1) {
       numDataIndexLevels = inputStream.readInt();
       firstDataBlockOffset = inputStream.readLong();
       lastDataBlockOffset = inputStream.readLong();
@@ -238,7 +252,9 @@ public class FixedFileTrailer {
           Bytes.readStringFixedSize(inputStream, MAX_COMPARATOR_NAME_LENGTH);
     }
 
-    expectVersion(inputStream.readInt());
+    int version = inputStream.readInt();
+    expectMajorVersion(extractMajorVersion(version));
+    expectMinorVersion(extractMinorVersion(version));
   }
 
   private void append(StringBuilder sb, String s) {
@@ -257,14 +273,15 @@ public class FixedFileTrailer {
     append(sb, "totalUncomressedBytes=" + totalUncompressedBytes);
     append(sb, "entryCount=" + entryCount);
     append(sb, "compressionCodec=" + compressionCodec);
-    if (version == 2) {
+    if (majorVersion == 2) {
       append(sb, "uncompressedDataIndexSize=" + uncompressedDataIndexSize);
       append(sb, "numDataIndexLevels=" + numDataIndexLevels);
       append(sb, "firstDataBlockOffset=" + firstDataBlockOffset);
       append(sb, "lastDataBlockOffset=" + lastDataBlockOffset);
       append(sb, "comparatorClassName=" + comparatorClassName);
     }
-    append(sb, "version=" + version);
+    append(sb, "majorVersion=" + majorVersion);
+    append(sb, "minorVersion=" + minorVersion);
 
     return sb.toString();
   }
@@ -301,31 +318,44 @@ public class FixedFileTrailer {
     buf.position(buf.limit() - Bytes.SIZEOF_INT);
     int version = buf.getInt();
 
+    // Extract the major and minor versions.
+    int majorVersion = extractMajorVersion(version);
+    int minorVersion = extractMinorVersion(version);
+
     try {
-      HFile.checkFormatVersion(version);
+      HFile.checkFormatVersion(majorVersion);
     } catch (IllegalArgumentException iae) {
       // In this context, an invalid version might indicate a corrupt HFile.
       throw new IOException(iae);
     }
 
-    int trailerSize = getTrailerSize(version);
+    int trailerSize = getTrailerSize(majorVersion);
 
-    FixedFileTrailer fft = new FixedFileTrailer(version);
+    FixedFileTrailer fft = new FixedFileTrailer(majorVersion, minorVersion);
     fft.deserialize(new DataInputStream(new ByteArrayInputStream(buf.array(),
         buf.arrayOffset() + bufferSize - trailerSize, trailerSize)));
     return fft;
   }
 
-  public void expectVersion(int expected) {
-    if (version != expected) {
-      throw new IllegalArgumentException("Invalid HFile version: " + version
+  public void expectMajorVersion(int expected) {
+    if (majorVersion != expected) {
+      throw new IllegalArgumentException("Invalid HFile major version: "
+          + majorVersion 
           + " (expected: " + expected + ")");
     }
   }
 
-  public void expectAtLeastVersion(int lowerBound) {
-    if (version < lowerBound) {
-      throw new IllegalArgumentException("Invalid HFile version: " + version
+  public void expectMinorVersion(int expected) {
+    if (minorVersion != expected) {
+      throw new IllegalArgumentException("Invalid HFile minor version: "
+          + minorVersion + " (expected: " + expected + ")");
+    }
+  }
+
+  public void expectAtLeastMajorVersion(int lowerBound) {
+    if (majorVersion < lowerBound) {
+      throw new IllegalArgumentException("Invalid HFile major version: "
+          + majorVersion
           + " (expected: " + lowerBound + " or higher).");
     }
   }
@@ -375,11 +405,11 @@ public class FixedFileTrailer {
   }
 
   public void setEntryCount(long newEntryCount) {
-    if (version == 1) {
+    if (majorVersion == 1) {
       int intEntryCount = (int) Math.min(Integer.MAX_VALUE, newEntryCount);
       if (intEntryCount != newEntryCount) {
         LOG.info("Warning: entry count is " + newEntryCount + " but writing "
-            + intEntryCount + " into the version " + version + " trailer");
+            + intEntryCount + " into the version " + majorVersion + " trailer");
       }
       entryCount = intEntryCount;
       return;
@@ -396,42 +426,52 @@ public class FixedFileTrailer {
   }
 
   public int getNumDataIndexLevels() {
-    expectAtLeastVersion(2);
+    expectAtLeastMajorVersion(2);
     return numDataIndexLevels;
   }
 
   public void setNumDataIndexLevels(int numDataIndexLevels) {
-    expectAtLeastVersion(2);
+    expectAtLeastMajorVersion(2);
     this.numDataIndexLevels = numDataIndexLevels;
   }
 
   public long getLastDataBlockOffset() {
-    expectAtLeastVersion(2);
+    expectAtLeastMajorVersion(2);
     return lastDataBlockOffset;
   }
 
   public void setLastDataBlockOffset(long lastDataBlockOffset) {
-    expectAtLeastVersion(2);
+    expectAtLeastMajorVersion(2);
     this.lastDataBlockOffset = lastDataBlockOffset;
   }
 
   public long getFirstDataBlockOffset() {
-    expectAtLeastVersion(2);
+    expectAtLeastMajorVersion(2);
     return firstDataBlockOffset;
   }
 
   public void setFirstDataBlockOffset(long firstDataBlockOffset) {
-    expectAtLeastVersion(2);
+    expectAtLeastMajorVersion(2);
     this.firstDataBlockOffset = firstDataBlockOffset;
   }
 
-  public int getVersion() {
-    return version;
+  /**
+   * Returns the major version of this HFile format
+   */
+  public int getMajorVersion() {
+    return majorVersion;
+  }
+
+  /**
+   * Returns the minor version of this HFile format
+   */
+  int getMinorVersion() {
+    return minorVersion;
   }
 
   @SuppressWarnings("rawtypes")
   public void setComparatorClass(Class<? extends RawComparator> klass) {
-    expectAtLeastVersion(2);
+    expectAtLeastMajorVersion(2);
     comparatorClassName = klass.getName();
   }
 
@@ -458,20 +498,43 @@ public class FixedFileTrailer {
   }
 
   RawComparator<byte[]> createComparator() throws IOException {
-    expectAtLeastVersion(2);
+    expectAtLeastMajorVersion(2);
     return createComparator(comparatorClassName);
   }
 
   public long getUncompressedDataIndexSize() {
-    if (version == 1)
+    if (majorVersion == 1)
       return 0;
     return uncompressedDataIndexSize;
   }
 
   public void setUncompressedDataIndexSize(
       long uncompressedDataIndexSize) {
-    expectAtLeastVersion(2);
+    expectAtLeastMajorVersion(2);
     this.uncompressedDataIndexSize = uncompressedDataIndexSize;
   }
 
+  /**
+   * Extracts the major version for a 4-byte serialized version data.
+   * The major version is the 3 least significant bytes
+   */
+  private static int extractMajorVersion(int serializedVersion) {
+    return (serializedVersion & 0x00ffffff);
+  }
+
+  /**
+   * Extracts the minor version for a 4-byte serialized version data.
+   * The major version are the 3 the most significant bytes
+   */
+  private static int extractMinorVersion(int serializedVersion) {
+    return (serializedVersion >>> 24);
+  }
+
+  /**
+   * Create a 4 byte serialized version number by combining the
+   * minor and major version numbers.
+   */
+  private static int materializeVersion(int majorVersion, int minorVersion) {
+    return ((majorVersion & 0x00ffffff) | (minorVersion << 24));
+  }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1298641&r1=1298640&r2=1298641&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Thu Mar  8 22:55:49
2012
@@ -43,10 +43,12 @@ import org.apache.hadoop.hbase.HColumnDe
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KeyComparator;
+import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.HbaseMapWritable;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.SchemaAware;
+import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -156,6 +158,12 @@ public class HFile {
    */
   public final static int MIN_NUM_HFILE_PATH_LEVELS = 5;
 
+  /**
+   * The number of bytes per checksum.
+   */
+  public static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024;
+  public static final ChecksumType DEFAULT_CHECKSUM_TYPE = ChecksumType.CRC32;
+
   // For measuring latency of "sequential" reads and writes
   static final AtomicInteger readOps = new AtomicInteger();
   static final AtomicLong readTimeNano = new AtomicLong();
@@ -166,6 +174,9 @@ public class HFile {
   static final AtomicInteger preadOps = new AtomicInteger();
   static final AtomicLong preadTimeNano = new AtomicLong();
 
+  // For measuring number of checksum failures
+  static final AtomicLong checksumFailures = new AtomicLong();
+
   // for test purpose
   public static volatile AtomicLong dataBlockReadCnt = new AtomicLong(0);
 
@@ -195,6 +206,14 @@ public class HFile {
     return writeTimeNano.getAndSet(0) / 1000000;
   }
 
+  /**
+   * Number of checksum verification failures. It also
+   * clears the counter.
+   */
+  public static final long getChecksumFailuresCount() {
+    return checksumFailures.getAndSet(0);
+  }
+
   /** API required to write an {@link HFile} */
   public interface Writer extends Closeable {
 
@@ -247,6 +266,8 @@ public class HFile {
         HFile.DEFAULT_COMPRESSION_ALGORITHM;
     protected HFileDataBlockEncoder encoder = NoOpDataBlockEncoder.INSTANCE;
     protected KeyComparator comparator;
+    protected ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE;
+    protected int bytesPerChecksum = DEFAULT_BYTES_PER_CHECKSUM;
 
     WriterFactory(Configuration conf, CacheConfig cacheConf) {
       this.conf = conf;
@@ -296,6 +317,17 @@ public class HFile {
       return this;
     }
 
+    public WriterFactory withChecksumType(ChecksumType checksumType) {
+      Preconditions.checkNotNull(checksumType);
+      this.checksumType = checksumType;
+      return this;
+    }
+
+    public WriterFactory withBytesPerChecksum(int bytesPerChecksum) {
+      this.bytesPerChecksum = bytesPerChecksum;
+      return this;
+    }
+
     public Writer create() throws IOException {
       if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) {
         throw new AssertionError("Please specify exactly one of " +
@@ -305,14 +337,15 @@ public class HFile {
         ostream = AbstractHFileWriter.createOutputStream(conf, fs, path);
       }
       return createWriter(fs, path, ostream, blockSize,
-          compression, encoder, comparator);
+          compression, encoder, comparator, checksumType, bytesPerChecksum);
     }
 
     protected abstract Writer createWriter(FileSystem fs, Path path,
         FSDataOutputStream ostream, int blockSize,
         Compression.Algorithm compress,
         HFileDataBlockEncoder dataBlockEncoder,
-        KeyComparator comparator) throws IOException;
+        KeyComparator comparator, ChecksumType checksumType,
+        int bytesPerChecksum) throws IOException;
   }
 
   /** The configuration key for HFile version to use for new files */
@@ -431,20 +464,22 @@ public class HFile {
   }
 
   private static Reader pickReaderVersion(Path path, FSDataInputStream fsdis,
+      FSDataInputStream fsdisNoFsChecksum,
       long size, boolean closeIStream, CacheConfig cacheConf,
-      DataBlockEncoding preferredEncodingInCache)
+      DataBlockEncoding preferredEncodingInCache, HFileSystem hfs)
       throws IOException {
     FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, size);
-    switch (trailer.getVersion()) {
+    switch (trailer.getMajorVersion()) {
     case 1:
       return new HFileReaderV1(path, trailer, fsdis, size, closeIStream,
           cacheConf);
     case 2:
-      return new HFileReaderV2(path, trailer, fsdis, size, closeIStream,
-          cacheConf, preferredEncodingInCache);
+      return new HFileReaderV2(path, trailer, fsdis, fsdisNoFsChecksum,
+          size, closeIStream,
+          cacheConf, preferredEncodingInCache, hfs);
     default:
       throw new IOException("Cannot instantiate reader for HFile version " +
-          trailer.getVersion());
+          trailer.getMajorVersion());
     }
   }
 
@@ -452,9 +487,26 @@ public class HFile {
       FileSystem fs, Path path, CacheConfig cacheConf,
       DataBlockEncoding preferredEncodingInCache) throws IOException {
     final boolean closeIStream = true;
-    return pickReaderVersion(path, fs.open(path),
+    HFileSystem hfs = null;
+    FSDataInputStream fsdis = fs.open(path);
+    FSDataInputStream fsdisNoFsChecksum = fsdis;
+    // If the fs is not an instance of HFileSystem, then create an 
+    // instance of HFileSystem that wraps over the specified fs.
+    // In this case, we will not be able to avoid checksumming inside
+    // the filesystem.
+    if (!(fs instanceof HFileSystem)) {
+      hfs = new HFileSystem(fs);
+    } else {
+      hfs = (HFileSystem)fs;
+      // open a stream to read data without checksum verification in
+      // the filesystem
+      if (hfs != null) {
+        fsdisNoFsChecksum = hfs.getNoChecksumFs().open(path);
+      }
+    }
+    return pickReaderVersion(path, fsdis, fsdisNoFsChecksum,
         fs.getFileStatus(path).getLen(), closeIStream, cacheConf,
-        preferredEncodingInCache);
+        preferredEncodingInCache, hfs);
   }
 
   public static Reader createReader(
@@ -463,12 +515,15 @@ public class HFile {
         DataBlockEncoding.NONE);
   }
 
-  public static Reader createReaderFromStream(Path path,
+  /**
+   * This factory method is used only by unit tests
+   */
+  static Reader createReaderFromStream(Path path,
       FSDataInputStream fsdis, long size, CacheConfig cacheConf)
       throws IOException {
     final boolean closeIStream = false;
-    return pickReaderVersion(path, fsdis, size, closeIStream, cacheConf,
-        DataBlockEncoding.NONE);
+    return pickReaderVersion(path, fsdis, fsdis, size, closeIStream, cacheConf,
+        DataBlockEncoding.NONE, null);
   }
 
   /*



Mime
View raw message