hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject svn commit: r1476677 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/io/hfile/
Date Sat, 27 Apr 2013 21:34:44 GMT
Author: sershe
Date: Sat Apr 27 21:34:43 2013
New Revision: 1476677

URL: http://svn.apache.org/r1476677
Log:
HBASE-7970 Improve file descriptor usage: currently, there are two file descriptors per storefile

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java?rev=1476677&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
(added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
Sat Apr 27 21:34:43 2013
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.FileLink;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Wrapper for input stream(s) that takes care of the interaction of FS and HBase checksums,
+ * as well as closing streams. Initialization is not thread-safe, but normal operation is;
+ * see method comments.
+ */
+public class FSDataInputStreamWrapper {
+  private final HFileSystem hfs;
+  private final Path path;
+  private final FileLink link;
+  private final boolean doCloseStreams;
+
+  /** Two stream handles, one with and one without FS-level checksum.
+   * HDFS checksum setting is on FS level, not single read level, so you have to keep two
+   * FS objects and two handles open to interleave different reads freely, which is very
sad.
+   * This is what we do:
+   * 1) First, we need to read the trailer of HFile to determine checksum parameters.
+   *  We always use FS checksum to do that, so ctor opens {@link #stream}.
+   * 2.1) After that, if HBase checksum is not used, we'd just always use {@link #stream};
+   * 2.2) If HBase checksum can be used, we'll open {@link #streamNoFsChecksum},
+   *  and close {@link #stream}. User MUST call prepareForBlockReader for that to happen;
+   *  if they don't, (2.1) will be the default.
+   * 3) The users can call {@link #shouldUseHBaseChecksum()}, and pass its result to
+   *  {@link #getStream(boolean)} to get stream (if Java had out/pointer params we could
+   *  return both in one call). This stream is guaranteed to be set.
+   * 4) The first time HBase checksum fails, one would call {@link #fallbackToFsChecksum(int)}.
+   * That will take lock, and open {@link #stream}. While this is going on, others will
+   * continue to use the old stream; if they also want to fall back, they'll also call
+   * {@link #fallbackToFsChecksum(int)}, and block until {@link #stream} is set.
+   * 5) After some number of checksumOk() calls, we will go back to using HBase checksum.
+   * We will have 2 handles; however we presume checksums fail so rarely that we don't care.
+   */
+  private volatile FSDataInputStream stream = null;
+  private volatile FSDataInputStream streamNoFsChecksum = null;
+  private Object streamNoFsChecksumFirstCreateLock = new Object();
+
+  // The configuration states that we should validate hbase checksums
+  private boolean useHBaseChecksumConfigured;
+
+  // Record the current state of this reader with respect to
+  // validating checkums in HBase. This is originally set the same
+  // value as useHBaseChecksumConfigured, but can change state as and when
+  // we encounter checksum verification failures.
+  private volatile boolean useHBaseChecksum;
+
+  // In the case of a checksum failure, do these many succeeding
+  // reads without hbase checksum verification.
+  private volatile int hbaseChecksumOffCount = -1;
+
+  public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
+    this(fs, null, path);
+  }
+
+  public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException {
+    this(fs, link, null);
+  }
+
+  private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path) throws IOException
{
+    assert (path == null) != (link == null);
+    this.path = path;
+    this.link = link;
+    this.doCloseStreams = true;
+    // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem
+    // that wraps over the specified fs. In this case, we will not be able to avoid
+    // checksumming inside the filesystem.
+    this.hfs = (fs instanceof HFileSystem) ? (HFileSystem)fs : new HFileSystem(fs);
+
+    // Initially we are going to read the tail block. Open the reader w/FS checksum.
+    this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
+    this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
+  }
+
+  /**
+   * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any
+   * reads finish and before any other reads start (what happens in reality is we read the
+   * tail, then call this based on what's in the tail, then read blocks).
+   * @param forceNoHBaseChecksum Force not using HBase checksum.
+   */
+  public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException {
+    if (hfs == null) return;
+    assert this.stream != null && !this.useHBaseChecksumConfigured;
+    boolean useHBaseChecksum =
+        !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs()
!= hfs);
+
+    if (useHBaseChecksum) {
+      FileSystem fsNc = hfs.getNoChecksumFs();
+      this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path);
+      this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum;
+      // Close the checksum stream; we will reopen it if we get an HBase checksum failure.
+      this.stream.close();
+      this.stream = null;
+    }
+  }
+
+  /** For use in tests. */
+  @VisibleForTesting
+  public FSDataInputStreamWrapper(FSDataInputStream fsdis) {
+    this(fsdis, fsdis);
+  }
+
+  /** For use in tests. */
+  @VisibleForTesting
+  public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum)
{
+    doCloseStreams = false;
+    stream = fsdis;
+    streamNoFsChecksum = noChecksum;
+    path = null;
+    link = null;
+    hfs = null;
+    useHBaseChecksumConfigured = useHBaseChecksum = false;
+  }
+
+  /**
+   * @return Whether we are presently using HBase checksum.
+   */
+  public boolean shouldUseHBaseChecksum() {
+    return this.useHBaseChecksum;
+  }
+
+  /**
+   * Get the stream to use. Thread-safe.
+   * @param useHBaseChecksum must be the value that shouldUseHBaseChecksum has returned
+   *  at some point in the past, otherwise the result is undefined.
+   */
+  public FSDataInputStream getStream(boolean useHBaseChecksum) {
+    return useHBaseChecksum ? this.streamNoFsChecksum : this.stream;
+  }
+
+  /**
+   * Read from non-checksum stream failed, fall back to FS checksum. Thread-safe.
+   * @param offCount For how many checksumOk calls to turn off the HBase checksum.
+   */
+  public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException {
+    // checksumOffCount is speculative, but let's try to reset it less.
+    boolean partOfConvoy = false;
+    if (this.stream == null) {
+      synchronized (streamNoFsChecksumFirstCreateLock) {
+        partOfConvoy = (this.stream != null);
+        if (!partOfConvoy) {
+          this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
+        }
+      }
+    }
+    if (!partOfConvoy) {
+      this.useHBaseChecksum = false;
+      this.hbaseChecksumOffCount = offCount;
+    }
+    return this.stream;
+  }
+
+  /** Report that checksum was ok, so we may ponder going back to HBase checksum. */
+  public void checksumOk() {
+    if (this.useHBaseChecksumConfigured && !this.useHBaseChecksum
+        && (this.hbaseChecksumOffCount-- < 0)) {
+      // The stream we need is already open (because we were using HBase checksum in the
past).
+      assert this.streamNoFsChecksum != null;
+      this.useHBaseChecksum = true;
+    }
+  }
+
+  /** Close stream(s) if necessary. */
+  public void close() throws IOException {
+    if (!doCloseStreams) return;
+    try {
+      if (stream != streamNoFsChecksum && streamNoFsChecksum != null) {
+        streamNoFsChecksum.close();
+        streamNoFsChecksum = null;
+      }
+    } finally {
+      if (stream != null) {
+        stream.close();
+        stream = null;
+      }
+    }
+  }
+
+  public HFileSystem getHfs() {
+    return this.hfs;
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java?rev=1476677&r1=1476676&r2=1476677&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
Sat Apr 27 21:34:43 2013
@@ -87,18 +87,17 @@ public class HalfStoreFileReader extends
    * Creates a half file reader for a hfile referred to by an hfilelink.
    * @param fs fileystem to read from
    * @param p path to hfile
-   * @param in {@link FSDataInputStream}
-   * @param inNoChecksum {@link FSDataInputStream} opened on a filesystem without checksum
+   * @param in {@link FSDataInputStreamWrapper}
    * @param size Full size of the hfile file
    * @param cacheConf
    * @param r original reference file (contains top or bottom)
    * @param preferredEncodingInCache
    * @throws IOException
    */
-  public HalfStoreFileReader(final FileSystem fs, final Path p, final FSDataInputStream in,
-      final FSDataInputStream inNoChecksum, long size, final CacheConfig cacheConf,
-      final Reference r, final DataBlockEncoding preferredEncodingInCache) throws IOException
{
-    super(fs, p, in, inNoChecksum, size, cacheConf, preferredEncodingInCache, true);
+  public HalfStoreFileReader(final FileSystem fs, final Path p, final FSDataInputStreamWrapper
in,
+      long size, final CacheConfig cacheConf,  final Reference r,
+      final DataBlockEncoding preferredEncodingInCache) throws IOException {
+    super(fs, p, in, size, cacheConf, preferredEncodingInCache);
     // This is not actual midkey for this half-file; its just border
     // around which we split top and bottom.  Have to look in files to find
     // actual last and first keys for bottom and top halves.  Half-files don't

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java?rev=1476677&r1=1476676&r2=1476677&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
Sat Apr 27 21:34:43 2013
@@ -36,10 +36,6 @@ import org.apache.hadoop.io.RawComparato
  */
 @InterfaceAudience.Private
 public abstract class AbstractHFileReader implements HFile.Reader {
-
-  /** Filesystem-level block reader for this HFile format version. */
-  protected HFileBlock.FSReader fsBlockReader;
-
   /** Stream to read from. Does checksum verifications in file system */
   protected FSDataInputStream istream;
 
@@ -47,12 +43,6 @@ public abstract class AbstractHFileReade
    * does not do checksum verification in the file system */
   protected FSDataInputStream istreamNoFsChecksum;
 
-  /**
-   * True if we should close the input stream when done. We don't close it if we
-   * didn't open it.
-   */
-  protected final boolean closeIStream;
-
   /** Data block index reader keeping the root data index in memory */
   protected HFileBlockIndex.BlockIndexReader dataBlockIndexReader;
 
@@ -101,27 +91,14 @@ public abstract class AbstractHFileReade
   protected HFileSystem hfs;
 
   protected AbstractHFileReader(Path path, FixedFileTrailer trailer,
-      final FSDataInputStream fsdis, final long fileSize,
-      final boolean closeIStream,
-      final CacheConfig cacheConf) {
-    this(path, trailer, fsdis, fsdis, fileSize, closeIStream, cacheConf, null);
-  }
-
-  protected AbstractHFileReader(Path path, FixedFileTrailer trailer,
-      final FSDataInputStream fsdis, final FSDataInputStream fsdisNoFsChecksum,
-      final long fileSize,
-      final boolean closeIStream,
-      final CacheConfig cacheConf, final HFileSystem hfs) {
+      final long fileSize, final CacheConfig cacheConf, final HFileSystem hfs) {
     this.trailer = trailer;
     this.compressAlgo = trailer.getCompressionCodec();
     this.cacheConf = cacheConf;
     this.fileSize = fileSize;
-    this.istream = fsdis;
-    this.closeIStream = closeIStream;
     this.path = path;
     this.name = path.getName();
     this.hfs = hfs;
-    this.istreamNoFsChecksum = fsdisNoFsChecksum;
   }
 
   @SuppressWarnings("serial")
@@ -341,9 +318,7 @@ public abstract class AbstractHFileReade
   }
 
   /** For testing */
-  HFileBlock.FSReader getUncachedBlockReader() {
-    return fsBlockReader;
-  }
+  abstract HFileBlock.FSReader getUncachedBlockReader();
 
   public Path getPath() {
     return path;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1476677&r1=1476676&r2=1476677&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Sat
Apr 27 21:34:43 2013
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
 import org.apache.hadoop.hbase.protobuf.generated.HFileProtos;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.regionserver.StoreFile.WriterBuilder;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -543,32 +544,29 @@ public class HFile {
    * TODO This is a bad abstraction.  See HBASE-6635.
    *
    * @param path hfile's path
-   * @param fsdis an open checksummed stream of path's file
-   * @param fsdisNoFsChecksum an open unchecksummed stream of path's file
+   * @param fsdis stream of path's file
    * @param size max size of the trailer.
-   * @param closeIStream boolean for closing file after the getting the reader version.
    * @param cacheConf Cache configuation values, cannot be null.
    * @param preferredEncodingInCache
    * @param hfs
    * @return an appropriate instance of HFileReader
    * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
    */
-  private static Reader pickReaderVersion(Path path, FSDataInputStream fsdis,
-      FSDataInputStream fsdisNoFsChecksum,
-      long size, boolean closeIStream, CacheConfig cacheConf,
-      DataBlockEncoding preferredEncodingInCache, HFileSystem hfs)
-      throws IOException {
+  private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis,
+      long size, CacheConfig cacheConf, DataBlockEncoding preferredEncodingInCache,
+      HFileSystem hfs) throws IOException {
     FixedFileTrailer trailer = null;
     try {
-      trailer = FixedFileTrailer.readFromStream(fsdis, size);
+      boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
+      assert !isHBaseChecksum; // Initially we must read with FS checksum.
+      trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
     } catch (IllegalArgumentException iae) {
       throw new CorruptHFileException("Problem reading HFile Trailer from file " + path,
iae);
     }
     switch (trailer.getMajorVersion()) {
     case 2:
-      return new HFileReaderV2(path, trailer, fsdis, fsdisNoFsChecksum,
-          size, closeIStream,
-          cacheConf, preferredEncodingInCache, hfs);
+      return new HFileReaderV2(
+          path, trailer, fsdis, size, cacheConf, preferredEncodingInCache, hfs);
     default:
       throw new CorruptHFileException("Invalid HFile version " + trailer.getMajorVersion());
     }
@@ -586,43 +584,24 @@ public class HFile {
       FileSystem fs, Path path, CacheConfig cacheConf,
       DataBlockEncoding preferredEncodingInCache) throws IOException {
     final boolean closeIStream = true;
-    HFileSystem hfs = null;
-    FSDataInputStream fsdis = fs.open(path);
-    FSDataInputStream fsdisNoFsChecksum = fsdis;
-    // If the fs is not an instance of HFileSystem, then create an 
-    // instance of HFileSystem that wraps over the specified fs.
-    // In this case, we will not be able to avoid checksumming inside
-    // the filesystem.
-    if (!(fs instanceof HFileSystem)) {
-      hfs = new HFileSystem(fs);
-    } else {
-      hfs = (HFileSystem)fs;
-      // open a stream to read data without checksum verification in
-      // the filesystem
-      fsdisNoFsChecksum = hfs.getNoChecksumFs().open(path);
-    }
-    return pickReaderVersion(path, fsdis, fsdisNoFsChecksum,
-        fs.getFileStatus(path).getLen(), closeIStream, cacheConf,
-        preferredEncodingInCache, hfs);
+    FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path);
+    return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(),
+        cacheConf, preferredEncodingInCache, stream.getHfs());
   }
 
   /**
    * @param fs A file system
    * @param path Path to HFile
-   * @param fsdis an open checksummed stream of path's file
-   * @param fsdisNoFsChecksum an open unchecksummed stream of path's file
+   * @param fsdis a stream of path's file
    * @param size max size of the trailer.
    * @param cacheConf Cache configuration for hfile's contents
    * @param preferredEncodingInCache Preferred in-cache data encoding algorithm.
-   * @param closeIStream boolean for closing file after the getting the reader version.
    * @return A version specific Hfile Reader
    * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
    */
-  public static Reader createReaderWithEncoding(
-      FileSystem fs, Path path, FSDataInputStream fsdis,
-      FSDataInputStream fsdisNoFsChecksum, long size, CacheConfig cacheConf,
-      DataBlockEncoding preferredEncodingInCache, boolean closeIStream)
-      throws IOException {
+  public static Reader createReaderWithEncoding(FileSystem fs, Path path,
+      FSDataInputStreamWrapper fsdis, long size, CacheConfig cacheConf,
+      DataBlockEncoding preferredEncodingInCache) throws IOException {
     HFileSystem hfs = null;
 
     // If the fs is not an instance of HFileSystem, then create an
@@ -634,9 +613,7 @@ public class HFile {
     } else {
       hfs = (HFileSystem)fs;
     }
-    return pickReaderVersion(path, fsdis, fsdisNoFsChecksum, size,
-                             closeIStream, cacheConf,
-                             preferredEncodingInCache, hfs);
+    return pickReaderVersion(path, fsdis, size, cacheConf, preferredEncodingInCache, hfs);
   }
 
   /**
@@ -660,9 +637,8 @@ public class HFile {
   static Reader createReaderFromStream(Path path,
       FSDataInputStream fsdis, long size, CacheConfig cacheConf)
       throws IOException {
-    final boolean closeIStream = false;
-    return pickReaderVersion(path, fsdis, fsdis, size, closeIStream, cacheConf,
-        DataBlockEncoding.NONE, null);
+    FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis);
+    return pickReaderVersion(path, wrapper, size, cacheConf, DataBlockEncoding.NONE, null);
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java?rev=1476677&r1=1476676&r2=1476677&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
Sat Apr 27 21:34:43 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.encodi
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -1122,6 +1123,9 @@ public class HFileBlock implements Cache
      * @return an iterator of blocks between the two given offsets
      */
     BlockIterator blockRange(long startOffset, long endOffset);
+
+    /** Closes the backing streams */
+    void closeStreams() throws IOException;
   }
 
   /**
@@ -1129,15 +1133,6 @@ public class HFileBlock implements Cache
    * tools for implementing HFile format version-specific block readers.
    */
   private abstract static class AbstractFSReader implements FSReader {
-
-    /** The file system stream of the underlying {@link HFile} that 
-     * does checksum validations in the filesystem */
-    protected final FSDataInputStream istream;
-
-    /** The file system stream of the underlying {@link HFile} that
-     * does not do checksum verification in the file system */
-    protected final FSDataInputStream istreamNoFsChecksum;
-
     /** Compression algorithm used by the {@link HFile} */
     protected Compression.Algorithm compressAlgo;
 
@@ -1161,19 +1156,14 @@ public class HFileBlock implements Cache
     /** The default buffer size for our buffered streams */
     public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
 
-    public AbstractFSReader(FSDataInputStream istream, 
-        FSDataInputStream istreamNoFsChecksum,
-        Algorithm compressAlgo,
-        long fileSize, int minorVersion, HFileSystem hfs, Path path) 
-        throws IOException {
-      this.istream = istream;
+    public AbstractFSReader(Algorithm compressAlgo, long fileSize, int minorVersion,
+        HFileSystem hfs, Path path) throws IOException {
       this.compressAlgo = compressAlgo;
       this.fileSize = fileSize;
       this.minorVersion = minorVersion;
       this.hfs = hfs;
       this.path = path;
       this.hdrSize = headerSize(minorVersion);
-      this.istreamNoFsChecksum = istreamNoFsChecksum;
     }
 
     @Override
@@ -1277,22 +1267,6 @@ public class HFileBlock implements Cache
     }
 
     /**
-     * Creates a buffered stream reading a certain slice of the file system
-     * input stream. We need this because the decompression we use seems to
-     * expect the input stream to be bounded.
-     *
-     * @param offset the starting file offset the bounded stream reads from
-     * @param size the size of the segment of the file the stream should read
-     * @param pread whether to use position reads
-     * @return a stream restricted to the given portion of the file
-     */
-    protected InputStream createBufferedBoundedStream(long offset,
-        int size, boolean pread) {
-      return new BufferedInputStream(new BoundedRangeFileInputStream(istream,
-          offset, size, pread), Math.min(DEFAULT_BUFFER_SIZE, size));
-    }
-
-    /**
      * @return The minorVersion of this HFile
      */
     protected int getMinorVersion() {
@@ -1312,19 +1286,9 @@ public class HFileBlock implements Cache
 
   /** Reads version 2 blocks from the filesystem. */
   static class FSReaderV2 extends AbstractFSReader {
-
-    // The configuration states that we should validate hbase checksums
-    private final boolean useHBaseChecksumConfigured;
-
-    // Record the current state of this reader with respect to
-    // validating checkums in HBase. This is originally set the same
-    // value as useHBaseChecksumConfigured, but can change state as and when
-    // we encounter checksum verification failures.
-    private volatile boolean useHBaseChecksum;
-
-    // In the case of a checksum failure, do these many succeeding
-    // reads without hbase checksum verification.
-    private volatile int checksumOffCount = -1;
+    /** The file system stream of the underlying {@link HFile} that 
+     * does or doesn't do checksum validations in the filesystem */
+    protected FSDataInputStreamWrapper streamWrapper;
 
     /** Whether we include memstore timestamp in data blocks */
     protected boolean includesMemstoreTS;
@@ -1345,30 +1309,14 @@ public class HFileBlock implements Cache
           }
         };
 
-    public FSReaderV2(FSDataInputStream istream, 
-        FSDataInputStream istreamNoFsChecksum, Algorithm compressAlgo,
-        long fileSize, int minorVersion, HFileSystem hfs, Path path) 
-      throws IOException {
-      super(istream, istreamNoFsChecksum, compressAlgo, fileSize, 
-            minorVersion, hfs, path);
+    public FSReaderV2(FSDataInputStreamWrapper stream, Algorithm compressAlgo, long fileSize,
+        int minorVersion, HFileSystem hfs, Path path) throws IOException {
+      super(compressAlgo, fileSize, minorVersion, hfs, path);
+      this.streamWrapper = stream;
+      // Older versions of HBase didn't support checksum.
+      boolean forceNoHBaseChecksum = (this.getMinorVersion() < MINOR_VERSION_WITH_CHECKSUM);
+      this.streamWrapper.prepareForBlockReader(forceNoHBaseChecksum);
 
-      if (hfs != null) {
-        // Check the configuration to determine whether hbase-level
-        // checksum verification is needed or not.
-        useHBaseChecksum = hfs.useHBaseChecksum();
-      } else {
-        // The configuration does not specify anything about hbase checksum
-        // validations. Set it to true here assuming that we will verify
-        // hbase checksums for all reads. For older files that do not have 
-        // stored checksums, this flag will be reset later.
-        useHBaseChecksum = true;
-      }
-
-      // for older versions, hbase did not store checksums.
-      if (getMinorVersion() < MINOR_VERSION_WITH_CHECKSUM) {
-        useHBaseChecksum = false;
-      }
-      this.useHBaseChecksumConfigured = useHBaseChecksum;
       defaultDecodingCtx =
         new HFileBlockDefaultDecodingContext(compressAlgo);
       encodedBlockDecodingCtx =
@@ -1381,7 +1329,7 @@ public class HFileBlock implements Cache
      */
     FSReaderV2(FSDataInputStream istream, Algorithm compressAlgo,
         long fileSize) throws IOException {
-      this(istream, istream, compressAlgo, fileSize, 
+      this(new FSDataInputStreamWrapper(istream), compressAlgo, fileSize,
            HFileReaderV2.MAX_MINOR_VERSION, null, null);
     }
 
@@ -1400,20 +1348,14 @@ public class HFileBlock implements Cache
     public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
         int uncompressedSize, boolean pread) throws IOException {
 
-      // It is ok to get a reference to the stream here without any
-      // locks because it is marked final.
-      FSDataInputStream is = this.istreamNoFsChecksum;
-
       // get a copy of the current state of whether to validate
       // hbase checksums or not for this read call. This is not 
       // thread-safe but the one constaint is that if we decide 
       // to skip hbase checksum verification then we are 
       // guaranteed to use hdfs checksum verification.
-      boolean doVerificationThruHBaseChecksum = this.useHBaseChecksum;
-      if (!doVerificationThruHBaseChecksum) {
-        is = this.istream;
-      }
-                     
+      boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
+      FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
+
       HFileBlock blk = readBlockDataInternal(is, offset, 
                          onDiskSizeWithHeaderL, 
                          uncompressedSize, pread,
@@ -1434,17 +1376,15 @@ public class HFileBlock implements Cache
           throw new IOException(msg); // cannot happen case here
         }
         HFile.checksumFailures.incrementAndGet(); // update metrics
- 
+
         // If we have a checksum failure, we fall back into a mode where
         // the next few reads use HDFS level checksums. We aim to make the
         // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
         // hbase checksum verification, but since this value is set without
         // holding any locks, it can so happen that we might actually do
         // a few more than precisely this number.
-        this.checksumOffCount = CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD;
-        this.useHBaseChecksum = false;
+        is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
         doVerificationThruHBaseChecksum = false;
-        is = this.istream;
         blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL,
                                     uncompressedSize, pread,
                                     doVerificationThruHBaseChecksum);
@@ -1469,11 +1409,7 @@ public class HFileBlock implements Cache
       // The decrementing of this.checksumOffCount is not thread-safe,
       // but it is harmless because eventually checksumOffCount will be
       // a negative number.
-      if (!this.useHBaseChecksum && this.useHBaseChecksumConfigured) {
-        if (this.checksumOffCount-- < 0) {
-          this.useHBaseChecksum = true; // auto re-enable hbase checksums
-        }
-      }
+      streamWrapper.checksumOk();
       return blk;
     }
 
@@ -1678,6 +1614,11 @@ public class HFileBlock implements Cache
       return ChecksumUtil.validateBlockChecksum(path, block,
                                                 data, hdrSize);
     }
+
+    @Override
+    public void closeStreams() throws IOException {
+      streamWrapper.close();
+    }
   }
 
   @Override

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1476677&r1=1476676&r2=1476677&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
Sat Apr 27 21:34:43 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.fs.HFileS
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.IdLock;
 import org.apache.hadoop.io.WritableUtils;
@@ -60,6 +61,9 @@ public class HFileReaderV2 extends Abstr
     return includesMemstoreTS;
   }
 
+  /** Filesystem-level block reader. */
+  private HFileBlock.FSReader fsBlockReader;
+
   /**
    * A "sparse lock" implementation allowing to lock on a particular block
    * identified by offset. The purpose of this is to avoid two clients loading
@@ -91,27 +95,21 @@ public class HFileReaderV2 extends Abstr
    *
    * @param path Path to HFile.
    * @param trailer File trailer.
-   * @param fsdis input stream. Caller is responsible for closing the passed
-   *          stream.
+   * @param fsdis input stream.
    * @param size Length of the stream.
-   * @param closeIStream Whether to close the stream.
    * @param cacheConf Cache configuration.
    * @param preferredEncodingInCache the encoding to use in cache in case we
    *          have a choice. If the file is already encoded on disk, we will
    *          still use its on-disk encoding in cache.
    */
   public HFileReaderV2(Path path, FixedFileTrailer trailer,
-      final FSDataInputStream fsdis, final FSDataInputStream fsdisNoFsChecksum,
-      final long size,
-      final boolean closeIStream, final CacheConfig cacheConf,
+      final FSDataInputStreamWrapper fsdis, final long size, final CacheConfig cacheConf,
       DataBlockEncoding preferredEncodingInCache, final HFileSystem hfs)
       throws IOException {
-    super(path, trailer, fsdis, fsdisNoFsChecksum, size, 
-          closeIStream, cacheConf, hfs);
+    super(path, trailer, size, cacheConf, hfs);
     trailer.expectMajorVersion(2);
     validateMinorVersion(path, trailer.getMinorVersion());
     HFileBlock.FSReaderV2 fsBlockReaderV2 = new HFileBlock.FSReaderV2(fsdis,
-        fsdisNoFsChecksum,
         compressAlgo, fileSize, trailer.getMinorVersion(), hfs, path);
     this.fsBlockReader = fsBlockReaderV2; // upcast
 
@@ -420,18 +418,16 @@ public class HFileReaderV2 extends Abstr
           + " block(s)");
       }
     }
-    if (closeIStream) {
-      if (istream != istreamNoFsChecksum && istreamNoFsChecksum != null) {
-        istreamNoFsChecksum.close();
-        istreamNoFsChecksum = null;
-      }
-      if (istream != null) {
-        istream.close();
-        istream = null;
-      }
-    }
+    fsBlockReader.closeStreams();
   }
 
+  /** For testing */
+  @Override
+  HFileBlock.FSReader getUncachedBlockReader() {
+    return fsBlockReader;
+  }
+
+
   protected abstract static class AbstractScannerV2
       extends AbstractHFileReader.Scanner {
     protected HFileBlock block;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1476677&r1=1476676&r2=1476677&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
Sat Apr 27 21:34:43 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.KeyValue.MetaKeyComparator;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
@@ -1069,12 +1070,10 @@ public class StoreFile {
       bloomFilterType = BloomType.NONE;
     }
 
-    public Reader(FileSystem fs, Path path, FSDataInputStream in,
-        final FSDataInputStream inNoChecksum, long size,
-        CacheConfig cacheConf, DataBlockEncoding preferredEncodingInCache,
-        boolean closeIStream) throws IOException {
-      reader = HFile.createReaderWithEncoding(fs, path, in, inNoChecksum,
-                  size, cacheConf, preferredEncodingInCache, closeIStream);
+    public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
+        CacheConfig cacheConf, DataBlockEncoding preferredEncodingInCache) throws IOException
{
+      reader = HFile.createReaderWithEncoding(
+          fs, path, in, size, cacheConf, preferredEncodingInCache);
       bloomFilterType = BloomType.NONE;
     }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java?rev=1476677&r1=1476676&r2=1476677&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
Sat Apr 27 21:34:43 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
@@ -157,49 +158,30 @@ public class StoreFileInfo {
    */
   public StoreFile.Reader open(final FileSystem fs, final CacheConfig cacheConf,
       final DataBlockEncoding dataBlockEncoding) throws IOException {
-    FSDataInputStream inNoChecksum = null;
-    FileSystem noChecksumFs = null;
-    FSDataInputStream in;
+    FSDataInputStreamWrapper in;
     FileStatus status;
 
-    if (fs instanceof HFileSystem) {
-      noChecksumFs = ((HFileSystem)fs).getNoChecksumFs();
+    if (this.link != null) {
+      // HFileLink
+      in = new FSDataInputStreamWrapper(fs, this.link);
+      status = this.link.getFileStatus(fs);
+    } else if (this.reference != null) {
+      // HFile Reference
+      Path referencePath = getReferredToFile(this.getPath());
+      in = new FSDataInputStreamWrapper(fs, referencePath);
+      status = fs.getFileStatus(referencePath);
+    } else {
+      in = new FSDataInputStreamWrapper(fs, this.getPath());
+      status = fileStatus;
     }
-
+    long length = status.getLen();
     if (this.reference != null) {
-      if (this.link != null) {
-        // HFileLink Reference
-        in = this.link.open(fs);
-        inNoChecksum = (noChecksumFs != null) ? this.link.open(noChecksumFs) : in;
-        status = this.link.getFileStatus(fs);
-      } else {
-        // HFile Reference
-        Path referencePath = getReferredToFile(this.getPath());
-        in = fs.open(referencePath);
-        inNoChecksum = (noChecksumFs != null) ? noChecksumFs.open(referencePath) : in;
-        status = fs.getFileStatus(referencePath);
-      }
-
       hdfsBlocksDistribution = computeRefFileHDFSBlockDistribution(fs, reference, status);
-      return new HalfStoreFileReader(fs, this.getPath(), in, inNoChecksum, status.getLen(),
-          cacheConf, reference, dataBlockEncoding);
+      return new HalfStoreFileReader(
+          fs, this.getPath(), in, length, cacheConf, reference, dataBlockEncoding);
     } else {
-      if (this.link != null) {
-        // HFileLink
-        in = this.link.open(fs);
-        inNoChecksum = (noChecksumFs != null) ? link.open(noChecksumFs) : in;
-        status = this.link.getFileStatus(fs);
-      } else {
-        // HFile
-        status = fileStatus;
-        in = fs.open(this.getPath());
-        inNoChecksum = (noChecksumFs != null) ? noChecksumFs.open(this.getPath()) : in;
-      }
-
-      long length = status.getLen();
       hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, length);
-      return new StoreFile.Reader(fs, this.getPath(), in, inNoChecksum, length,
-          cacheConf, dataBlockEncoding, true);
+      return new StoreFile.Reader(fs, this.getPath(), in, length, cacheConf, dataBlockEncoding);
     }
   }
 

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java?rev=1476677&r1=1476676&r2=1476677&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
Sat Apr 27 21:34:43 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.SmallTest
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.util.ChecksumType;
 
 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.*;
@@ -102,7 +103,7 @@ public class TestChecksum {
         assertEquals(true, hfs.useHBaseChecksum());
 
         // Do a read that purposely introduces checksum verification failures.
-        FSDataInputStream is = fs.open(path);
+        FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
         HFileBlock.FSReader hbr = new FSReaderV2Test(is, algo,
             totalSize, HFile.MAX_FORMAT_VERSION, fs, path);
         HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
@@ -145,7 +146,7 @@ public class TestChecksum {
         // any retries within hbase. 
         HFileSystem newfs = new HFileSystem(TEST_UTIL.getConfiguration(), false);
         assertEquals(false, newfs.useHBaseChecksum());
-        is = newfs.open(path);
+        is = new FSDataInputStreamWrapper(newfs, path);
         hbr = new FSReaderV2Test(is, algo,
             totalSize, HFile.MAX_FORMAT_VERSION, newfs, path);
         b = hbr.readBlockData(0, -1, -1, pread);
@@ -210,8 +211,8 @@ public class TestChecksum {
         // Read data back from file.
         FSDataInputStream is = fs.open(path);
         FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path);
-        HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, nochecksum, 
-            algo, totalSize, HFile.MAX_FORMAT_VERSION, hfs, path);
+        HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(
+            is, nochecksum), algo, totalSize, HFile.MAX_FORMAT_VERSION, hfs, path);
         HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
         is.close();
         b.sanityCheck();
@@ -250,19 +251,15 @@ public class TestChecksum {
     }
   }
 
-
   /**
    * A class that introduces hbase-checksum failures while 
    * reading  data from hfiles. This should trigger the hdfs level
    * checksum validations.
    */
   static private class FSReaderV2Test extends HFileBlock.FSReaderV2 {
-
-    FSReaderV2Test(FSDataInputStream istream, Algorithm algo,
-                   long fileSize, int minorVersion, FileSystem fs,
-                   Path path) throws IOException {
-      super(istream, istream, algo, fileSize, minorVersion, 
-            (HFileSystem)fs, path);
+    public FSReaderV2Test(FSDataInputStreamWrapper istream, Algorithm algo, long fileSize,
+        int minorVersion, FileSystem fs,Path path) throws IOException {
+      super(istream, algo, fileSize, minorVersion, (HFileSystem)fs, path);
     }
 
     @Override

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java?rev=1476677&r1=1476676&r2=1476677&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
Sat Apr 27 21:34:43 2013
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.io.encodi
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.io.compress.Compressor;
@@ -191,8 +192,8 @@ public class TestHFileBlockCompatibility
         os.close();
 
         FSDataInputStream is = fs.open(path);
-        HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, is, algo,
-            totalSize, MINOR_VERSION, fs, path);
+        HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is),
+            algo, totalSize, MINOR_VERSION, fs, path);
         HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
         is.close();
 
@@ -204,8 +205,8 @@ public class TestHFileBlockCompatibility
 
         if (algo == GZ) {
           is = fs.open(path);
-          hbr = new HFileBlock.FSReaderV2(is, is, algo, totalSize, MINOR_VERSION,
-                                          fs, path);
+          hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is),
+              algo, totalSize, MINOR_VERSION, fs, path);
           b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM +
                                 b.totalChecksumBytes(), -1, pread);
           assertEquals(blockStr, b.toString());
@@ -265,8 +266,8 @@ public class TestHFileBlockCompatibility
           os.close();
 
           FSDataInputStream is = fs.open(path);
-          HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, is, algo,
-              totalSize, MINOR_VERSION, fs, path);
+          HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is),
+              algo, totalSize, MINOR_VERSION, fs, path);
           hbr.setDataBlockEncoder(dataBlockEncoder);
           hbr.setIncludesMemstoreTS(includesMemstoreTS);
 



Mime
View raw message